Fix timeseries query constructor when postAggregator has an expression reading timestamp result column (#10198)

* Fix timeseries query constructor when postAggregator has an expression reading timestamp result column

* fix npe

* Fix postAgg referencing timestampResultField and add a test for it

* fix test

* doc

* revert doc
This commit is contained in:
Jihoon Son 2020-07-27 10:54:44 -07:00 committed by GitHub
parent 2f28be3f2a
commit 63c1746fe4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 183 additions and 28 deletions

View File

@ -78,11 +78,14 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
{
super(dataSource, querySegmentSpec, descending, context, granularity);
// The below should be executed after context is initialized.
final String timestampField = getTimestampResultField();
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.dimFilter = dimFilter;
this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs;
this.postAggregatorSpecs = Queries.prepareAggregations(
ImmutableList.of(),
timestampField == null ? ImmutableList.of() : ImmutableList.of(timestampField),
this.aggregatorSpecs,
postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs
);

View File

@ -412,7 +412,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
@Override
public RowSignature resultArraySignature(TimeseriesQuery query)
{
RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
rowSignatureBuilder.addTimeColumn();
if (StringUtils.isNotEmpty(query.getTimestampResultField())) {
@ -460,6 +459,14 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
final TimeseriesResultValue holder = result.getValue();
final Map<String, Object> values = new HashMap<>(holder.getBaseObject());
if (calculatePostAggs) {
// If "timestampResultField" is set, we must include a copy of the timestamp in the result.
// This is used by the SQL layer when it generates a Timeseries query for a group-by-time-floor SQL query.
// The SQL layer expects the result of the time-floor to have a specific name that is not going to be "__time".
// This should be done before computing post aggregators since they can reference "timestampResultField".
if (StringUtils.isNotEmpty(query.getTimestampResultField()) && result.getTimestamp() != null) {
final DateTime timestamp = result.getTimestamp();
values.put(query.getTimestampResultField(), timestamp.getMillis());
}
if (!query.getPostAggregatorSpecs().isEmpty()) {
// put non finalized aggregators for calculating dependent post Aggregators
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
@ -469,13 +476,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
values.put(postAgg.getName(), postAgg.compute(values));
}
}
// If "timestampResultField" is set, we must include a copy of the timestamp in the result.
// This is used by the SQL layer when it generates a Timeseries query for a group-by-time-floor SQL query.
// The SQL layer expects the result of the time-floor to have a specific name that is not going to be "__time".
if (StringUtils.isNotEmpty(query.getTimestampResultField()) && result.getTimestamp() != null) {
final DateTime timestamp = result.getTimestamp();
values.put(query.getTimestampResultField(), timestamp.getMillis());
}
}
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName())));

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
@ -40,9 +41,15 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerTest;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
@ -52,10 +59,11 @@ import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
*
* This class is for testing both timeseries and groupBy queries with the same set of queries.
*/
@RunWith(Parameterized.class)
public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
@ -99,15 +107,36 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
toolChest
);
final String timeDimension = tsQuery.getTimestampResultField();
final List<VirtualColumn> virtualColumns = new ArrayList<>(
Arrays.asList(tsQuery.getVirtualColumns().getVirtualColumns())
);
if (timeDimension != null) {
final PeriodGranularity granularity = (PeriodGranularity) tsQuery.getGranularity();
virtualColumns.add(
new ExpressionVirtualColumn(
"v0",
StringUtils.format("timestamp_floor(__time, '%s')", granularity.getPeriod()),
ValueType.LONG,
TestExprMacroTable.INSTANCE
)
);
}
GroupByQuery newQuery = GroupByQuery
.builder()
.setDataSource(tsQuery.getDataSource())
.setQuerySegmentSpec(tsQuery.getQuerySegmentSpec())
.setGranularity(tsQuery.getGranularity())
.setDimFilter(tsQuery.getDimensionsFilter())
.setDimensions(
timeDimension == null
? ImmutableList.of()
: ImmutableList.of(new DefaultDimensionSpec("v0", timeDimension, ValueType.LONG))
)
.setAggregatorSpecs(tsQuery.getAggregatorSpecs())
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
.setVirtualColumns(tsQuery.getVirtualColumns())
.setVirtualColumns(VirtualColumns.create(virtualColumns))
.setContext(tsQuery.getContext())
.build();
@ -239,14 +268,28 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
@Override
public void testTimeseriesWithTimestampResultFieldContextForArrayResponse()
{
// Skip this test because the timeseries test expects an extra column to be created (map from the timestamp_floor
// of the timestamp dimension) but group by doesn't do this.
// Cannot vectorize with an expression virtual column
if (!vectorize) {
super.testTimeseriesWithTimestampResultFieldContextForArrayResponse();
}
}
@Override
public void testTimeseriesWithTimestampResultFieldContextForMapResponse()
{
// Skip this test because the timeseries test expects an extra column to be created (map from the timestamp_floor
// of the timestamp dimension) but group by doesn't do this.
// Cannot vectorize with an expression virtual column
if (!vectorize) {
super.testTimeseriesWithTimestampResultFieldContextForMapResponse();
}
}
@Override
@Test
public void testTimeseriesWithPostAggregatorReferencingTimestampResultField()
{
// Cannot vectorize with an expression virtual column
if (!vectorize) {
super.testTimeseriesWithPostAggregatorReferencingTimestampResultField();
}
}
}

View File

@ -47,6 +47,7 @@ import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.filter.AndDimFilter;
@ -1619,7 +1620,6 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.aggregators(aggregatorFactoryList)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
.context(ImmutableMap.of("skipEmptyBuckets", "true"))
.descending(descending)
.context(makeContext(ImmutableMap.of("skipEmptyBuckets", "true")))
.build();
@ -2489,9 +2489,14 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
.descending(descending)
.context(ImmutableMap.of(
TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, TIMESTAMP_RESULT_FIELD_NAME
))
.context(
makeContext(
ImmutableMap.of(
TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, TIMESTAMP_RESULT_FIELD_NAME,
"skipEmptyBuckets", true
)
)
)
.build();
Assert.assertEquals(TIMESTAMP_RESULT_FIELD_NAME, query.getTimestampResultField());
@ -2518,6 +2523,9 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
final String[] expectedIndex = descending ?
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES_DESC :
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES;
final String[] expectedIndexToUse = Arrays.stream(expectedIndex)
.filter(eachIndex -> !"0.0".equals(eachIndex))
.toArray(String[]::new);
final Long expectedLast = descending ?
QueryRunnerTestHelper.EARLIEST.getMillis() :
@ -2545,7 +2553,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
if (QueryRunnerTestHelper.SKIPPED_DAY.getMillis() != current) {
Assert.assertEquals(
Doubles.tryParse(expectedIndex[count]).doubleValue(),
Doubles.tryParse(expectedIndexToUse[count]).doubleValue(),
(Double) result[3],
(Double) result[3] * 1e-6
);
@ -2555,7 +2563,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
0.02
);
Assert.assertEquals(
new Double(expectedIndex[count]) + 13L + 1L,
new Double(expectedIndexToUse[count]) + 13L + 1L,
(Double) result[5],
(Double) result[5] * 1e-6
);
@ -2572,7 +2580,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
0.02
);
Assert.assertEquals(
new Double(expectedIndex[count]) + 1L,
new Double(expectedIndexToUse[count]) + 1L,
(Double) result[5],
(Double) result[5] * 1e-6
);
@ -2612,9 +2620,14 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
.descending(descending)
.context(ImmutableMap.of(
TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, TIMESTAMP_RESULT_FIELD_NAME
))
.context(
makeContext(
ImmutableMap.of(
TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, TIMESTAMP_RESULT_FIELD_NAME,
"skipEmptyBuckets", true
)
)
)
.build();
Assert.assertEquals(TIMESTAMP_RESULT_FIELD_NAME, query.getTimestampResultField());
@ -2624,6 +2637,9 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
final String[] expectedIndex = descending ?
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES_DESC :
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES;
final String[] expectedIndexToUse = Arrays.stream(expectedIndex)
.filter(eachIndex -> !"0.0".equals(eachIndex))
.toArray(String[]::new);
final DateTime expectedLast = descending ?
QueryRunnerTestHelper.EARLIEST :
@ -2655,13 +2671,13 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
if (!QueryRunnerTestHelper.SKIPPED_DAY.equals(current)) {
Assert.assertEquals(
result.toString(),
Doubles.tryParse(expectedIndex[count]).doubleValue(),
Doubles.tryParse(expectedIndexToUse[count]).doubleValue(),
value.getDoubleMetric("index").doubleValue(),
value.getDoubleMetric("index").doubleValue() * 1e-6
);
Assert.assertEquals(
result.toString(),
new Double(expectedIndex[count]) +
new Double(expectedIndexToUse[count]) +
13L + 1L,
value.getDoubleMetric("addRowsIndexConstant"),
value.getDoubleMetric("addRowsIndexConstant") * 1e-6
@ -2681,7 +2697,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
);
Assert.assertEquals(
result.toString(),
new Double(expectedIndex[count]) + 1L,
new Double(expectedIndexToUse[count]) + 1L,
value.getDoubleMetric("addRowsIndexConstant"),
value.getDoubleMetric("addRowsIndexConstant") * 1e-6
);
@ -2811,6 +2827,53 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
Assert.assertEquals(10, list.size());
}
@Test
public void testTimeseriesWithPostAggregatorReferencingTimestampResultField()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(QueryRunnerTestHelper.DAY_GRAN)
.filters(QueryRunnerTestHelper.MARKET_DIMENSION, "spot")
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.postAggregators(
new FieldAccessPostAggregator("timestampInPostAgg", "myTimestamp")
)
.descending(descending)
.context(
makeContext(
ImmutableMap.of(TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, "myTimestamp")
)
)
.build();
final DateTime aprilFirst = DateTimes.of("2011-04-01");
final DateTime aprilSecond = DateTimes.of("2011-04-02");
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<>(
aprilFirst,
new TimeseriesResultValue(
ImmutableMap.of(
"myTimestamp", aprilFirst.getMillis(),
"timestampInPostAgg", aprilFirst.getMillis()
)
)
),
new Result<>(
aprilSecond,
new TimeseriesResultValue(
ImmutableMap.of(
"myTimestamp", aprilSecond.getMillis(),
"timestampInPostAgg", aprilSecond.getMillis()
)
)
)
);
Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
assertExpectedResults(expectedResults, results);
}
private Map<String, Object> makeContext()
{
return makeContext(ImmutableMap.of());

View File

@ -203,6 +203,52 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testGroupByWithPostAggregatorReferencingTimeFloorColumnOnTimeseries() throws Exception
{
cannotVectorize();
testQuery(
"SELECT TIME_FORMAT(\"date\", 'yyyy-MM'), SUM(x)\n"
+ "FROM (\n"
+ " SELECT\n"
+ " FLOOR(__time to hour) as \"date\",\n"
+ " COUNT(*) as x\n"
+ " FROM foo\n"
+ " GROUP BY 1\n"
+ ")\n"
+ "GROUP BY 1",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.HOUR)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(getTimeseriesContextWithFloorTime(TIMESERIES_CONTEXT_DEFAULT, "d0"))
.build()
)
.setInterval(querySegmentSpec(Intervals.ETERNITY))
.setVirtualColumns(
expressionVirtualColumn(
"v0",
"timestamp_format(\"d0\",'yyyy-MM','UTC')",
ValueType.STRING
)
)
.setGranularity(Granularities.ALL)
.addDimension(new DefaultDimensionSpec("v0", "_d0"))
.addAggregator(new LongSumAggregatorFactory("_a0", "a0"))
.build()
),
ImmutableList.of(
new Object[]{"2000-01", 3L},
new Object[]{"2001-01", 3L}
)
);
}
@Test
public void testJoinOuterGroupByAndSubqueryHasLimit() throws Exception
{