mirror of https://github.com/apache/druid.git
improve groupBy query granularity translation with 2x query performance improve when issued from sql layer (#11379)
* improve groupBy query granularity translation when issued from sql layer * fix style * use virtual column to determine timestampResult granularity * dont' apply postaggregators on compute nodes * relocate constants * fix order by correctness issue * fix ut * use more easier understanding code in DefaultLimitSpec * address comment * rollback use virtual column to determine timestampResult granularity * fix style * fix style * address the comment * add more detail document to explain the tradeoff * address the comment * address the comment
This commit is contained in:
parent
e228a84d91
commit
e39ff44481
|
@ -89,6 +89,9 @@ import java.util.stream.Collectors;
|
|||
public class GroupByQuery extends BaseQuery<ResultRow>
|
||||
{
|
||||
public static final String CTX_KEY_SORT_BY_DIMS_FIRST = "sortByDimsFirst";
|
||||
public static final String CTX_TIMESTAMP_RESULT_FIELD = "timestampResultField";
|
||||
public static final String CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY = "timestampResultFieldGranularity";
|
||||
public static final String CTX_TIMESTAMP_RESULT_FIELD_INDEX = "timestampResultFieldInOriginalDimensions";
|
||||
private static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp";
|
||||
|
||||
private static final Comparator<ResultRow> NON_GRANULAR_TIME_COMP =
|
||||
|
|
|
@ -220,6 +220,16 @@ public class DefaultLimitSpec implements LimitSpec
|
|||
sortingNeeded = !query.getGranularity().equals(Granularities.ALL) && query.getContextSortByDimsFirst();
|
||||
}
|
||||
|
||||
if (!sortingNeeded) {
|
||||
String timestampField = query.getContextValue(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD);
|
||||
if (timestampField != null && !timestampField.isEmpty()) {
|
||||
int timestampResultFieldIndex = query.getContextValue(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX);
|
||||
sortingNeeded = query.getContextSortByDimsFirst()
|
||||
? timestampResultFieldIndex != query.getDimensions().size() - 1
|
||||
: timestampResultFieldIndex != 0;
|
||||
}
|
||||
}
|
||||
|
||||
final Function<Sequence<ResultRow>, Sequence<ResultRow>> sortAndLimitFn;
|
||||
|
||||
if (sortingNeeded) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.inject.Inject;
|
||||
|
@ -35,6 +36,7 @@ import org.apache.druid.guice.annotations.Smile;
|
|||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.collect.Utils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.guava.CloseQuietly;
|
||||
import org.apache.druid.java.util.common.guava.LazySequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
|
@ -213,7 +215,64 @@ public class GroupByStrategyV2 implements GroupByStrategy
|
|||
context.put("finalize", false);
|
||||
context.put(GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2);
|
||||
context.put(CTX_KEY_OUTERMOST, false);
|
||||
if (query.getUniversalTimestamp() != null) {
|
||||
|
||||
Granularity granularity = query.getGranularity();
|
||||
List<DimensionSpec> dimensionSpecs = query.getDimensions();
|
||||
// the CTX_TIMESTAMP_RESULT_FIELD is set in DruidQuery.java
|
||||
final String timestampResultField = query.getContextValue(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD);
|
||||
final boolean hasTimestampResultField = (timestampResultField != null && !timestampResultField.isEmpty())
|
||||
&& query.getContextBoolean(CTX_KEY_OUTERMOST, true)
|
||||
&& !query.isApplyLimitPushDown();
|
||||
int timestampResultFieldIndex = 0;
|
||||
if (hasTimestampResultField) {
|
||||
// sql like "group by city_id,time_floor(__time to day)",
|
||||
// the original translated query is granularity=all and dimensions:[d0, d1]
|
||||
// the better plan is granularity=day and dimensions:[d0]
|
||||
// but the ResultRow structure is changed from [d0, d1] to [__time, d0]
|
||||
// this structure should be fixed as [d0, d1] (actually it is [d0, __time]) before postAggs are called.
|
||||
//
|
||||
// the above is the general idea of this optimization.
|
||||
// but from coding perspective, the granularity=all and "d0" dimension are referenced by many places,
|
||||
// eg: subtotals, having, grouping set, post agg,
|
||||
// there would be many many places need to be fixed if "d0" dimension is removed from query.dimensions
|
||||
// and the same to the granularity change.
|
||||
// so from easier coding perspective, this optimization is coded as groupby engine-level inner process change.
|
||||
// the most part of codes are in GroupByStrategyV2 about the process change between broker and compute node.
|
||||
// the basic logic like nested queries and subtotals are kept unchanged,
|
||||
// they will still see the granularity=all and the "d0" dimension.
|
||||
//
|
||||
// the tradeoff is that GroupByStrategyV2 behaviors differently according to the query contexts set in DruidQuery
|
||||
// in another word,
|
||||
// the query generated by "explain plan for select ..." doesn't match to the native query ACTUALLY being executed,
|
||||
// the granularity and dimensions are slightly different.
|
||||
// now, part of the query plan logic is handled in GroupByStrategyV2, not only in DruidQuery.toGroupByQuery()
|
||||
final Granularity timestampResultFieldGranularity
|
||||
= query.getContextValue(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY);
|
||||
dimensionSpecs =
|
||||
query.getDimensions()
|
||||
.stream()
|
||||
.filter(dimensionSpec -> !dimensionSpec.getOutputName().equals(timestampResultField))
|
||||
.collect(Collectors.toList());
|
||||
granularity = timestampResultFieldGranularity;
|
||||
// when timestampResultField is the last dimension, should set sortByDimsFirst=true,
|
||||
// otherwise the downstream is sorted by row's timestamp first which makes the final ordering not as expected
|
||||
timestampResultFieldIndex = query.getContextValue(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX);
|
||||
if (!query.getContextSortByDimsFirst() && timestampResultFieldIndex == query.getDimensions().size() - 1) {
|
||||
context.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, true);
|
||||
}
|
||||
// when timestampResultField is the first dimension and sortByDimsFirst=true,
|
||||
// it is actually equals to sortByDimsFirst=false
|
||||
if (query.getContextSortByDimsFirst() && timestampResultFieldIndex == 0) {
|
||||
context.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, false);
|
||||
}
|
||||
// when hasTimestampResultField=true and timestampResultField is neither first nor last dimension,
|
||||
// the DefaultLimitSpec will always do the reordering
|
||||
}
|
||||
final int timestampResultFieldIndexInOriginalDimensions = timestampResultFieldIndex;
|
||||
if (query.getUniversalTimestamp() != null && !hasTimestampResultField) {
|
||||
// universalTimestamp works only when granularity is all
|
||||
// hasTimestampResultField works only when granularity is all
|
||||
// fudgeTimestamp should not be used when hasTimestampResultField=true due to the row's actual timestamp is used
|
||||
context.put(CTX_KEY_FUDGE_TIMESTAMP, String.valueOf(query.getUniversalTimestamp().getMillis()));
|
||||
}
|
||||
|
||||
|
@ -228,10 +287,11 @@ public class GroupByStrategyV2 implements GroupByStrategy
|
|||
query.getQuerySegmentSpec(),
|
||||
query.getVirtualColumns(),
|
||||
query.getDimFilter(),
|
||||
query.getGranularity(),
|
||||
query.getDimensions(),
|
||||
granularity,
|
||||
dimensionSpecs,
|
||||
query.getAggregatorSpecs(),
|
||||
query.getPostAggregatorSpecs(),
|
||||
// Don't apply postaggregators on compute nodes
|
||||
ImmutableList.of(),
|
||||
// Don't do "having" clause until the end of this method.
|
||||
null,
|
||||
// Potentially pass limit down the stack (i.e. limit pushdown). Notes:
|
||||
|
@ -251,9 +311,26 @@ public class GroupByStrategyV2 implements GroupByStrategy
|
|||
// pushed-down subquery (CTX_KEY_EXECUTING_NESTED_QUERY).
|
||||
|
||||
if (!query.getContextBoolean(CTX_KEY_OUTERMOST, true)
|
||||
|| query.getPostAggregatorSpecs().isEmpty()
|
||||
|| query.getContextBoolean(GroupByQueryConfig.CTX_KEY_EXECUTING_NESTED_QUERY, false)) {
|
||||
return mergedResults;
|
||||
} else if (query.getPostAggregatorSpecs().isEmpty()) {
|
||||
if (!hasTimestampResultField) {
|
||||
return mergedResults;
|
||||
}
|
||||
return Sequences.map(
|
||||
mergedResults,
|
||||
row -> {
|
||||
final ResultRow resultRow = ResultRow.create(query.getResultRowSizeWithoutPostAggregators());
|
||||
moveOrReplicateTimestampInRow(
|
||||
query,
|
||||
timestampResultFieldIndexInOriginalDimensions,
|
||||
row,
|
||||
resultRow
|
||||
);
|
||||
|
||||
return resultRow;
|
||||
}
|
||||
);
|
||||
} else {
|
||||
return Sequences.map(
|
||||
mergedResults,
|
||||
|
@ -263,8 +340,17 @@ public class GroupByStrategyV2 implements GroupByStrategy
|
|||
final ResultRow rowWithPostAggregations = ResultRow.create(query.getResultRowSizeWithPostAggregators());
|
||||
|
||||
// Copy everything that comes before the postaggregations.
|
||||
for (int i = 0; i < query.getResultRowPostAggregatorStart(); i++) {
|
||||
rowWithPostAggregations.set(i, row.get(i));
|
||||
if (hasTimestampResultField) {
|
||||
moveOrReplicateTimestampInRow(
|
||||
query,
|
||||
timestampResultFieldIndexInOriginalDimensions,
|
||||
row,
|
||||
rowWithPostAggregations
|
||||
);
|
||||
} else {
|
||||
for (int i = 0; i < query.getResultRowPostAggregatorStart(); i++) {
|
||||
rowWithPostAggregations.set(i, row.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
// Compute postaggregations. We need to do this with a result-row map because PostAggregator.compute
|
||||
|
@ -285,6 +371,34 @@ public class GroupByStrategyV2 implements GroupByStrategy
|
|||
}
|
||||
}
|
||||
|
||||
private void moveOrReplicateTimestampInRow(
|
||||
GroupByQuery query,
|
||||
int timestampResultFieldIndexInOriginalDimensions,
|
||||
ResultRow before,
|
||||
ResultRow after
|
||||
)
|
||||
{
|
||||
// d1 is the __time
|
||||
// when query.granularity=all: convert [__time, d0] to [d0, d1] (actually, [d0, __time])
|
||||
// when query.granularity!=all: convert [__time, d0] to [__time, d0, d1] (actually, [__time, d0, __time])
|
||||
// overall, insert the removed d1 at the position where it is removed and remove the first __time if granularity=all
|
||||
Object theTimestamp = before.get(0);
|
||||
int expectedDimensionStartInAfterRow = 0;
|
||||
if (query.getResultRowHasTimestamp()) {
|
||||
expectedDimensionStartInAfterRow = 1;
|
||||
after.set(0, theTimestamp);
|
||||
}
|
||||
int timestampResultFieldIndexInAfterRow = timestampResultFieldIndexInOriginalDimensions + expectedDimensionStartInAfterRow;
|
||||
for (int i = expectedDimensionStartInAfterRow; i < timestampResultFieldIndexInAfterRow; i++) {
|
||||
// 0 in beforeRow is the timestamp, so plus 1 is the start of dimension in beforeRow
|
||||
after.set(i, before.get(i + 1));
|
||||
}
|
||||
after.set(timestampResultFieldIndexInAfterRow, theTimestamp);
|
||||
for (int i = timestampResultFieldIndexInAfterRow + 1; i < before.length() + expectedDimensionStartInAfterRow; i++) {
|
||||
after.set(i, before.get(i - expectedDimensionStartInAfterRow));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<ResultRow> applyPostProcessing(Sequence<ResultRow> results, GroupByQuery query)
|
||||
{
|
||||
|
@ -389,7 +503,9 @@ public class GroupByStrategyV2 implements GroupByStrategy
|
|||
)
|
||||
.withVirtualColumns(VirtualColumns.EMPTY)
|
||||
.withDimFilter(null)
|
||||
.withSubtotalsSpec(null);
|
||||
.withSubtotalsSpec(null)
|
||||
// timestampResult optimization is not for subtotal scenario, so disable it
|
||||
.withOverriddenContext(ImmutableMap.of(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, ""));
|
||||
|
||||
resultSupplierOne = GroupByRowProcessor.process(
|
||||
baseSubtotalQuery,
|
||||
|
|
|
@ -60,7 +60,9 @@ import org.junit.runners.Parameterized;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This class is for testing both timeseries and groupBy queries with the same set of queries.
|
||||
|
@ -111,7 +113,9 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
|
|||
final List<VirtualColumn> virtualColumns = new ArrayList<>(
|
||||
Arrays.asList(tsQuery.getVirtualColumns().getVirtualColumns())
|
||||
);
|
||||
Map<String, Object> theContext = tsQuery.getContext();
|
||||
if (timeDimension != null) {
|
||||
theContext = new HashMap<>(tsQuery.getContext());
|
||||
final PeriodGranularity granularity = (PeriodGranularity) tsQuery.getGranularity();
|
||||
virtualColumns.add(
|
||||
new ExpressionVirtualColumn(
|
||||
|
@ -121,6 +125,10 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
|
|||
TestExprMacroTable.INSTANCE
|
||||
)
|
||||
);
|
||||
|
||||
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, timeDimension);
|
||||
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, granularity);
|
||||
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, 0);
|
||||
}
|
||||
|
||||
GroupByQuery newQuery = GroupByQuery
|
||||
|
@ -137,7 +145,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
|
|||
.setAggregatorSpecs(tsQuery.getAggregatorSpecs())
|
||||
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
|
||||
.setVirtualColumns(VirtualColumns.create(virtualColumns))
|
||||
.setContext(tsQuery.getContext())
|
||||
.setContext(theContext)
|
||||
.build();
|
||||
|
||||
return Sequences.map(
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.druid.query.dimension.DimensionSpec;
|
|||
import org.apache.druid.query.filter.DimFilter;
|
||||
import org.apache.druid.query.groupby.GroupByQuery;
|
||||
import org.apache.druid.query.groupby.having.DimFilterHavingSpec;
|
||||
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
|
||||
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
|
||||
import org.apache.druid.query.ordering.StringComparator;
|
||||
import org.apache.druid.query.ordering.StringComparators;
|
||||
|
@ -1002,7 +1003,7 @@ public class DruidQuery
|
|||
postAggregators.addAll(sorting.getProjection().getPostAggregators());
|
||||
}
|
||||
|
||||
return new GroupByQuery(
|
||||
GroupByQuery query = new GroupByQuery(
|
||||
newDataSource,
|
||||
filtration.getQuerySegmentSpec(),
|
||||
getVirtualColumns(true),
|
||||
|
@ -1016,6 +1017,62 @@ public class DruidQuery
|
|||
grouping.getSubtotals().toSubtotalsSpec(grouping.getDimensionSpecs()),
|
||||
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
|
||||
);
|
||||
// We don't apply timestamp computation optimization yet when limit is pushed down. Maybe someday.
|
||||
if (query.getLimitSpec() instanceof DefaultLimitSpec && query.isApplyLimitPushDown()) {
|
||||
return query;
|
||||
}
|
||||
Map<String, Object> theContext = new HashMap<>();
|
||||
|
||||
Granularity queryGranularity = null;
|
||||
|
||||
// sql like "group by city_id,time_floor(__time to day)",
|
||||
// the original translated query is granularity=all and dimensions:[d0, d1]
|
||||
// the better plan is granularity=day and dimensions:[d0]
|
||||
// but the ResultRow structure is changed from [d0, d1] to [__time, d0]
|
||||
// this structure should be fixed as [d0, d1] (actually it is [d0, __time]) before postAggs are called.
|
||||
//
|
||||
// the above is the general idea of this optimization.
|
||||
// but from coding perspective, the granularity=all and "d0" dimension are referenced by many places,
|
||||
// eg: subtotals, having, grouping set, post agg,
|
||||
// there would be many many places need to be fixed if "d0" dimension is removed from query.dimensions
|
||||
// and the same to the granularity change.
|
||||
// so from easier coding perspective, this optimization is coded as groupby engine-level inner process change.
|
||||
// the most part of codes are in GroupByStrategyV2 about the process change between broker and compute node.
|
||||
// the basic logic like nested queries and subtotals are kept unchanged,
|
||||
// they will still see the granularity=all and the "d0" dimension.
|
||||
//
|
||||
// the tradeoff is that GroupByStrategyV2 behaviors differently according to the below query contexts.
|
||||
// in another word,
|
||||
// the query generated by "explain plan for select ..." doesn't match to the native query ACTUALLY being executed,
|
||||
// the granularity and dimensions are slightly different.
|
||||
// now, part of the query plan logic is handled in GroupByStrategyV2.
|
||||
if (!grouping.getDimensions().isEmpty()) {
|
||||
for (DimensionExpression dimensionExpression : grouping.getDimensions()) {
|
||||
Granularity granularity = Expressions.toQueryGranularity(
|
||||
dimensionExpression.getDruidExpression(),
|
||||
plannerContext.getExprMacroTable()
|
||||
);
|
||||
if (granularity == null) {
|
||||
continue;
|
||||
}
|
||||
if (queryGranularity != null) {
|
||||
// group by more than one timestamp_floor
|
||||
// eg: group by timestamp_floor(__time to DAY),timestamp_floor(__time, to HOUR)
|
||||
queryGranularity = null;
|
||||
break;
|
||||
}
|
||||
queryGranularity = granularity;
|
||||
int timestampDimensionIndexInDimensions = grouping.getDimensions().indexOf(dimensionExpression);
|
||||
// these settings will only affect the most inner query sent to the down streaming compute nodes
|
||||
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, dimensionExpression.getOutputName());
|
||||
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, timestampDimensionIndexInDimensions);
|
||||
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, queryGranularity);
|
||||
}
|
||||
}
|
||||
if (queryGranularity == null) {
|
||||
return query;
|
||||
}
|
||||
return query.withOverriddenContext(theContext);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -24,6 +24,8 @@ import junitparams.JUnitParamsRunner;
|
|||
import junitparams.Parameters;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.granularity.AllGranularity;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.query.QueryDataSource;
|
||||
import org.apache.druid.query.TableDataSource;
|
||||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||
|
@ -48,6 +50,7 @@ import org.junit.runner.RunWith;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@RunWith(JUnitParamsRunner.class)
|
||||
|
@ -114,7 +117,13 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
|
|||
"a0",
|
||||
"a0:a"
|
||||
)))
|
||||
.setContext(queryContext)
|
||||
.setContext(
|
||||
withTimestampResultContext(
|
||||
queryContext,
|
||||
"d0",
|
||||
Granularities.DAY
|
||||
)
|
||||
)
|
||||
.setGranularity(new AllGranularity())
|
||||
.build()
|
||||
)
|
||||
|
@ -207,7 +216,13 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
|
|||
)
|
||||
)
|
||||
.setAggregatorSpecs(new CountAggregatorFactory("a0"))
|
||||
.setContext(queryContext)
|
||||
.setContext(
|
||||
withTimestampResultContext(
|
||||
queryContext,
|
||||
"d0",
|
||||
Granularities.DAY
|
||||
)
|
||||
)
|
||||
.setGranularity(new AllGranularity())
|
||||
.build()
|
||||
)
|
||||
|
@ -294,7 +309,13 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
|
|||
)
|
||||
)
|
||||
.setAggregatorSpecs(new CountAggregatorFactory("a0"))
|
||||
.setContext(queryContext)
|
||||
.setContext(
|
||||
withTimestampResultContext(
|
||||
queryContext,
|
||||
"d0",
|
||||
Granularities.DAY
|
||||
)
|
||||
)
|
||||
.setGranularity(new AllGranularity())
|
||||
.build()
|
||||
)
|
||||
|
@ -381,7 +402,13 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
|
|||
selector("city", "A", null),
|
||||
not(selector("country", null, null))
|
||||
))
|
||||
.setContext(queryContext)
|
||||
.setContext(
|
||||
withTimestampResultContext(
|
||||
queryContext,
|
||||
"d0",
|
||||
Granularities.DAY
|
||||
)
|
||||
)
|
||||
.setGranularity(new AllGranularity())
|
||||
.build()
|
||||
)
|
||||
|
@ -468,7 +495,13 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
|
|||
selector("city", "A", null),
|
||||
not(selector("country", null, null))
|
||||
))
|
||||
.setContext(queryContext)
|
||||
.setContext(
|
||||
withTimestampResultContext(
|
||||
queryContext,
|
||||
"d0",
|
||||
Granularities.DAY
|
||||
)
|
||||
)
|
||||
.setGranularity(new AllGranularity())
|
||||
.build()
|
||||
)
|
||||
|
@ -503,4 +536,16 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
private Map<String, Object> withTimestampResultContext(
|
||||
Map<String, Object> input,
|
||||
String timestampResultField,
|
||||
Granularity granularity
|
||||
)
|
||||
{
|
||||
Map<String, Object> output = new HashMap<>(input);
|
||||
output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, timestampResultField);
|
||||
output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, granularity);
|
||||
output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, 0);
|
||||
return output;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.Intervals;
|
|||
import org.apache.druid.java.util.common.JodaUtils;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.math.expr.ExprMacroTable;
|
||||
|
@ -7988,7 +7989,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
ImmutableList.of("d0", "d1"),
|
||||
ImmutableList.of("d0", "d2")
|
||||
))
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(
|
||||
QUERY_CONTEXT_DEFAULT,
|
||||
"d0",
|
||||
0,
|
||||
Granularities.DAY
|
||||
))
|
||||
.build()
|
||||
)
|
||||
)
|
||||
|
@ -10097,7 +10103,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
Integer.MAX_VALUE
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d0", 0, Granularities.YEAR))
|
||||
.build()
|
||||
),
|
||||
NullHandling.replaceWithDefault() ?
|
||||
|
@ -13359,7 +13365,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
Integer.MAX_VALUE
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))
|
||||
.build()
|
||||
),
|
||||
NullHandling.replaceWithDefault() ?
|
||||
|
@ -13426,7 +13432,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
ImmutableList.of()
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
|
@ -13492,7 +13498,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
ImmutableList.of()
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
|
@ -13640,7 +13646,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
ImmutableList.of()
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
|
@ -13699,7 +13705,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
ImmutableList.of()
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d0", 0, Granularities.MONTH))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
|
@ -13757,7 +13763,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
ImmutableList.of()
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
|
@ -13818,7 +13824,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
ImmutableList.of("d2")
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d2", 1, Granularities.MONTH))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
|
@ -13879,7 +13885,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
ImmutableList.of()
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
|
@ -13952,7 +13958,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
Integer.MAX_VALUE
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
|
@ -14020,7 +14026,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
Integer.MAX_VALUE
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
|
@ -14089,7 +14095,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
1
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
|
@ -17283,7 +17289,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
100
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
|
@ -17356,7 +17362,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
100
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.<Object[]>builder().add(
|
||||
|
@ -17925,4 +17931,18 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
)
|
||||
);
|
||||
}
|
||||
|
||||
private Map<String, Object> withTimestampResultContext(
|
||||
Map<String, Object> input,
|
||||
String timestampResultField,
|
||||
int timestampResultFieldIndex,
|
||||
Granularity granularity
|
||||
)
|
||||
{
|
||||
Map<String, Object> output = new HashMap<>(input);
|
||||
output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, timestampResultField);
|
||||
output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, granularity);
|
||||
output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, timestampResultFieldIndex);
|
||||
return output;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue