Convert simple min/max SQL queries on __time to timeBoundary queries (#12472)

* Support array based results in timeBoundary query

* Fix bug with query interval in timeBoundary

* Convert min(__time) and max(__time) SQL queries to timeBoundary

* Add tests for timeBoundary backed SQL queries

* Fix query plans for existing tests

* fixup! Convert min(__time) and max(__time) SQL queries to timeBoundary

* fixup! Add tests for timeBoundary backed SQL queries

* fixup! Fix bug with query interval in timeBoundary
This commit is contained in:
Rohan Garg 2022-04-25 20:48:58 +05:30 committed by GitHub
parent b47316b844
commit 95694b5afa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 441 additions and 62 deletions

View File

@ -48,6 +48,8 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
private static final QuerySegmentSpec DEFAULT_SEGMENT_SPEC = new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY); private static final QuerySegmentSpec DEFAULT_SEGMENT_SPEC = new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY);
public static final String MAX_TIME = "maxTime"; public static final String MAX_TIME = "maxTime";
public static final String MIN_TIME = "minTime"; public static final String MIN_TIME = "minTime";
public static final String MAX_TIME_ARRAY_OUTPUT_NAME = "maxTimeArrayOutputName";
public static final String MIN_TIME_ARRAY_OUTPUT_NAME = "minTimeArrayOutputName";
private static final byte CACHE_TYPE_ID = 0x0; private static final byte CACHE_TYPE_ID = 0x0;

View File

@ -42,6 +42,8 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result; import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.LogicalSegment; import org.apache.druid.timeline.LogicalSegment;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -224,4 +226,40 @@ public class TimeBoundaryQueryQueryToolChest
} }
}; };
} }
@Override
public RowSignature resultArraySignature(TimeBoundaryQuery query)
{
if (query.isMinTime() || query.isMaxTime()) {
RowSignature.Builder builder = RowSignature.builder();
String outputName = query.isMinTime() ?
query.getContextValue(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, TimeBoundaryQuery.MIN_TIME) :
query.getContextValue(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, TimeBoundaryQuery.MAX_TIME);
return builder.add(outputName, ColumnType.LONG).build();
}
return super.resultArraySignature(query);
}
@Override
public Sequence<Object[]> resultsAsArrays(
TimeBoundaryQuery query,
Sequence<Result<TimeBoundaryResultValue>> resultSequence
)
{
if (query.isMaxTime()) {
return Sequences.map(
resultSequence,
result -> result == null || result.getValue() == null || result.getValue().getMaxTime() == null ? null :
new Object[]{result.getValue().getMaxTime().getMillis()}
);
} else if (query.isMinTime()) {
return Sequences.map(
resultSequence,
result -> result == null || result.getValue() == null || result.getValue().getMinTime() == null ? null :
new Object[]{result.getValue().getMinTime().getMillis()}
);
} else {
return super.resultsAsArrays(query, resultSequence);
}
}
} }

View File

@ -22,6 +22,7 @@ package org.apache.druid.query.timeboundary;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.BaseSequence;
@ -45,6 +46,7 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.filter.Filters; import org.apache.druid.segment.filter.Filters;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -155,7 +157,7 @@ public class TimeBoundaryQueryRunnerFactory
final DateTime minTime; final DateTime minTime;
final DateTime maxTime; final DateTime maxTime;
if (legacyQuery.getFilter() != null) { if (legacyQuery.getFilter() != null || !queryIntervalContainsAdapterInterval()) {
minTime = getTimeBoundary(adapter, legacyQuery, false); minTime = getTimeBoundary(adapter, legacyQuery, false);
if (minTime == null) { if (minTime == null) {
maxTime = null; maxTime = null;
@ -183,6 +185,15 @@ public class TimeBoundaryQueryRunnerFactory
{ {
} }
private boolean queryIntervalContainsAdapterInterval()
{
List<Interval> queryIntervals = legacyQuery.getQuerySegmentSpec().getIntervals();
if (queryIntervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", queryIntervals);
}
return queryIntervals.get(0).contains(adapter.getInterval());
}
} }
); );
} }

View File

@ -25,11 +25,14 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.Druids; import org.apache.druid.query.Druids;
import org.apache.druid.query.Result; import org.apache.druid.query.Result;
import org.apache.druid.query.TableDataSource; import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.LogicalSegment; import org.apache.druid.timeline.LogicalSegment;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
@ -289,6 +292,43 @@ public class TimeBoundaryQueryQueryToolChestTest
Assert.assertEquals(7, segments.size()); Assert.assertEquals(7, segments.size());
} }
@Test(expected = UOE.class)
public void testResultArraySignature()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.build();
new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery);
}
@Test
public void testResultArraySignatureWithMinTime()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.bound(TimeBoundaryQuery.MIN_TIME)
.context(ImmutableMap.of(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "foo"))
.build();
RowSignature rowSignature = new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery);
RowSignature.Builder expectedRowSignatureBuilder = RowSignature.builder();
expectedRowSignatureBuilder.add("foo", ColumnType.LONG);
Assert.assertEquals(expectedRowSignatureBuilder.build(), rowSignature);
}
@Test
public void testResultArraySignatureWithMaxTime()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.bound(TimeBoundaryQuery.MAX_TIME)
.context(ImmutableMap.of(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "foo"))
.build();
RowSignature rowSignature = new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery);
RowSignature.Builder expectedRowSignatureBuilder = RowSignature.builder();
expectedRowSignatureBuilder.add("foo", ColumnType.LONG);
Assert.assertEquals(expectedRowSignatureBuilder.build(), rowSignature);
}
@Test @Test
public void testCacheStrategy() throws Exception public void testCacheStrategy() throws Exception
{ {

View File

@ -19,11 +19,14 @@
package org.apache.druid.query.timeboundary; package org.apache.druid.query.timeboundary;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.io.CharSource; import com.google.common.io.CharSource;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids; import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
@ -35,6 +38,7 @@ import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.context.ConcurrentResponseContext; import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
@ -47,6 +51,7 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.SingleElementPartitionChunk; import org.apache.druid.timeline.partition.SingleElementPartitionChunk;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -183,6 +188,32 @@ public class TimeBoundaryQueryRunnerTest
Assert.assertEquals(DateTimes.of("2011-01-16T00:00:00.000Z"), maxTime); Assert.assertEquals(DateTimes.of("2011-01-16T00:00:00.000Z"), maxTime);
} }
@Test
@SuppressWarnings("unchecked")
public void testTimeFilteredTimeBoundaryQuery() throws IOException
{
QueryRunner customRunner = getCustomRunner();
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.intervals(
new MultipleIntervalSegmentSpec(
ImmutableList.of(Intervals.of("2011-01-15T00:00:00.000Z/2011-01-16T00:00:00.000Z"))
)
)
.build();
List<Result<TimeBoundaryResultValue>> results =
customRunner.run(QueryPlus.wrap(timeBoundaryQuery)).toList();
Assert.assertTrue(Iterables.size(results) > 0);
TimeBoundaryResultValue val = results.iterator().next().getValue();
DateTime minTime = val.getMinTime();
DateTime maxTime = val.getMaxTime();
Assert.assertEquals(DateTimes.of("2011-01-15T00:00:00.000Z"), minTime);
Assert.assertEquals(DateTimes.of("2011-01-15T01:00:00.000Z"), maxTime);
}
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testFilteredTimeBoundaryQueryNoMatches() throws IOException public void testFilteredTimeBoundaryQueryNoMatches() throws IOException
@ -216,6 +247,22 @@ public class TimeBoundaryQueryRunnerTest
Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime); Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime);
} }
@Test(expected = UOE.class)
@SuppressWarnings("unchecked")
public void testTimeBoundaryArrayResults()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.bound(null)
.build();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.initializeMissingSegments();
new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
timeBoundaryQuery,
runner.run(QueryPlus.wrap(timeBoundaryQuery), context)
).toList();
}
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testTimeBoundaryMax() public void testTimeBoundaryMax()
@ -235,6 +282,26 @@ public class TimeBoundaryQueryRunnerTest
Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime); Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime);
} }
@Test
@SuppressWarnings("unchecked")
public void testTimeBoundaryMaxArraysResults()
{
TimeBoundaryQuery maxTimeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.bound(TimeBoundaryQuery.MAX_TIME)
.build();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.initializeMissingSegments();
List<Object[]> maxTime = new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
maxTimeBoundaryQuery,
runner.run(QueryPlus.wrap(maxTimeBoundaryQuery), context)
).toList();
Long maxTimeMillis = (Long) maxTime.get(0)[0];
Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), new DateTime(maxTimeMillis, DateTimeZone.UTC));
Assert.assertEquals(1, maxTime.size());
}
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testTimeBoundaryMin() public void testTimeBoundaryMin()
@ -254,6 +321,26 @@ public class TimeBoundaryQueryRunnerTest
Assert.assertNull(maxTime); Assert.assertNull(maxTime);
} }
@Test
@SuppressWarnings("unchecked")
public void testTimeBoundaryMinArraysResults()
{
TimeBoundaryQuery minTimeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.bound(TimeBoundaryQuery.MIN_TIME)
.build();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.initializeMissingSegments();
List<Object[]> minTime = new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
minTimeBoundaryQuery,
runner.run(QueryPlus.wrap(minTimeBoundaryQuery), context)
).toList();
Long minTimeMillis = (Long) minTime.get(0)[0];
Assert.assertEquals(DateTimes.of("2011-01-12T00:00:00.000Z"), new DateTime(minTimeMillis, DateTimeZone.UTC));
Assert.assertEquals(1, minTime.size());
}
@Test @Test
public void testMergeResults() public void testMergeResults()
{ {

View File

@ -55,6 +55,8 @@ import org.apache.druid.query.scan.ScanQueryConfig;
import org.apache.druid.query.scan.ScanQueryEngine; import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanQueryQueryToolChest; import org.apache.druid.query.scan.ScanQueryQueryToolChest;
import org.apache.druid.query.scan.ScanQueryRunnerFactory; import org.apache.druid.query.scan.ScanQueryRunnerFactory;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
@ -344,6 +346,7 @@ public class QueryStackTests
) )
) )
.put(GroupByQuery.class, groupByQueryRunnerFactory) .put(GroupByQuery.class, groupByQueryRunnerFactory)
.put(TimeBoundaryQuery.class, new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER))
.build() .build()
); );

View File

@ -49,7 +49,11 @@ import org.apache.druid.query.DataSource;
import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.SimpleLongAggregatorFactory;
import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQuery;
@ -59,6 +63,7 @@ import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.topn.DimensionTopNMetricSpec; import org.apache.druid.query.topn.DimensionTopNMetricSpec;
import org.apache.druid.query.topn.InvertedTopNMetricSpec; import org.apache.druid.query.topn.InvertedTopNMetricSpec;
@ -799,6 +804,11 @@ public class DruidQuery
} }
} }
final TimeBoundaryQuery timeBoundaryQuery = toTimeBoundaryQuery();
if (timeBoundaryQuery != null) {
return timeBoundaryQuery;
}
final TimeseriesQuery tsQuery = toTimeseriesQuery(queryFeatureInspector); final TimeseriesQuery tsQuery = toTimeseriesQuery(queryFeatureInspector);
if (tsQuery != null) { if (tsQuery != null) {
return tsQuery; return tsQuery;
@ -822,6 +832,69 @@ public class DruidQuery
throw new CannotBuildQueryException("Cannot convert query parts into an actual query"); throw new CannotBuildQueryException("Cannot convert query parts into an actual query");
} }
/**
* Return this query as a TimeBoundary query, or null if this query is not compatible with Timeseries.
*
* @return a TimeBoundaryQuery if possible. null if it is not possible to construct one.
*/
@Nullable
private TimeBoundaryQuery toTimeBoundaryQuery()
{
if (grouping == null
|| grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs())
|| grouping.getHavingFilter() != null
|| selectProjection != null) {
return null;
}
if (sorting != null && sorting.getOffsetLimit().hasOffset()) {
// Timeboundary cannot handle offsets.
return null;
}
if (grouping.getDimensions().isEmpty() &&
grouping.getPostAggregators().isEmpty() &&
grouping.getAggregatorFactories().size() == 1) { // currently only handles max(__time) or min(__time) not both
boolean minTime;
AggregatorFactory aggregatorFactory = Iterables.getOnlyElement(grouping.getAggregatorFactories());
if (aggregatorFactory instanceof LongMaxAggregatorFactory ||
aggregatorFactory instanceof LongMinAggregatorFactory) {
SimpleLongAggregatorFactory minMaxFactory = (SimpleLongAggregatorFactory) aggregatorFactory;
String fieldName = minMaxFactory.getFieldName();
if (fieldName == null ||
!fieldName.equals(ColumnHolder.TIME_COLUMN_NAME) ||
(minMaxFactory.getExpression() != null && !minMaxFactory.getExpression().isEmpty())) {
return null;
}
minTime = aggregatorFactory instanceof LongMinAggregatorFactory;
} else {
return null;
}
final Pair<DataSource, Filtration> dataSourceFiltrationPair = getFiltration(
dataSource,
filter,
virtualColumnRegistry
);
final DataSource newDataSource = dataSourceFiltrationPair.lhs;
final Filtration filtration = dataSourceFiltrationPair.rhs;
String bound = minTime ? TimeBoundaryQuery.MIN_TIME : TimeBoundaryQuery.MAX_TIME;
HashMap<String, Object> context = new HashMap<>(plannerContext.getQueryContext().getMergedParams());
if (minTime) {
context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, aggregatorFactory.getName());
} else {
context.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, aggregatorFactory.getName());
}
return new TimeBoundaryQuery(
newDataSource,
filtration.getQuerySegmentSpec(),
bound,
filtration.getDimFilter(),
context
);
}
return null;
}
/** /**
* Return this query as a Timeseries query, or null if this query is not compatible with Timeseries. * Return this query as a Timeseries query, or null if this query is not compatible with Timeseries.
* *

View File

@ -72,6 +72,7 @@ import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.query.topn.DimensionTopNMetricSpec; import org.apache.druid.query.topn.DimensionTopNMetricSpec;
import org.apache.druid.query.topn.InvertedTopNMetricSpec; import org.apache.druid.query.topn.InvertedTopNMetricSpec;
import org.apache.druid.query.topn.NumericTopNMetricSpec; import org.apache.druid.query.topn.NumericTopNMetricSpec;
@ -2387,6 +2388,8 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
cannotVectorize(); cannotVectorize();
} }
Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery( testQuery(
"SELECT DISTINCT __time FROM druid.foo WHERE __time IN (SELECT MAX(__time) FROM druid.foo)", "SELECT DISTINCT __time FROM druid.foo WHERE __time IN (SELECT MAX(__time) FROM druid.foo)",
queryContext, queryContext,
@ -2396,14 +2399,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
join( join(
new TableDataSource(CalciteTests.DATASOURCE1), new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource( new QueryDataSource(
Druids.newTimeseriesQueryBuilder() Druids.newTimeBoundaryQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1) .dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity())) .intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL) .bound(TimeBoundaryQuery.MAX_TIME)
.aggregators(new LongMaxAggregatorFactory("a0", "__time")) .context(maxTimeQueryContext)
.context(QUERY_CONTEXT_DEFAULT) .build()
.build()
.withOverriddenContext(queryContext)
), ),
"j0.", "j0.",
equalsCondition( equalsCondition(
@ -2433,6 +2434,8 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
// Cannot vectorize JOIN operator. // Cannot vectorize JOIN operator.
cannotVectorize(); cannotVectorize();
Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery( testQuery(
"SELECT DISTINCT __time FROM druid.foo WHERE __time NOT IN (SELECT MAX(__time) FROM druid.foo)", "SELECT DISTINCT __time FROM druid.foo WHERE __time NOT IN (SELECT MAX(__time) FROM druid.foo)",
queryContext, queryContext,
@ -2446,13 +2449,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
GroupByQuery GroupByQuery
.builder() .builder()
.setDataSource( .setDataSource(
Druids.newTimeseriesQueryBuilder() Druids.newTimeBoundaryQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1) .dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity())) .intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL) .bound(TimeBoundaryQuery.MAX_TIME)
.aggregators(new LongMaxAggregatorFactory("a0", "__time")) .context(maxTimeQueryContext)
.context(QUERY_CONTEXT_DEFAULT) .build()
.build()
) )
.setInterval(querySegmentSpec(Filtration.eternity())) .setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL) .setGranularity(Granularities.ALL)
@ -3566,6 +3568,8 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
cannotVectorize(); cannotVectorize();
} }
Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery( testQuery(
"SELECT dim1, COUNT(*) FROM foo\n" "SELECT dim1, COUNT(*) FROM foo\n"
+ "WHERE dim1 IN ('abc', 'def')" + "WHERE dim1 IN ('abc', 'def')"
@ -3580,28 +3584,26 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
join( join(
new TableDataSource(CalciteTests.DATASOURCE1), new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource( new QueryDataSource(
Druids.newTimeseriesQueryBuilder() Druids.newTimeBoundaryQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1) .dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity())) .intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL) .bound(TimeBoundaryQuery.MAX_TIME)
.filters(selector("cnt", "1", null)) .filters(selector("cnt", "1", null))
.aggregators(new LongMaxAggregatorFactory("a0", "__time")) .context(maxTimeQueryContext)
.context(QUERY_CONTEXT_DEFAULT) .build()
.build()
), ),
"j0.", "j0.",
"(\"__time\" == \"j0.a0\")", "(\"__time\" == \"j0.a0\")",
JoinType.INNER JoinType.INNER
), ),
new QueryDataSource( new QueryDataSource(
Druids.newTimeseriesQueryBuilder() Druids.newTimeBoundaryQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1) .dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity())) .intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL) .bound(TimeBoundaryQuery.MAX_TIME)
.filters(not(selector("cnt", "2", null))) .filters(not(selector("cnt", "2", null)))
.aggregators(new LongMaxAggregatorFactory("a0", "__time")) .context(maxTimeQueryContext)
.context(QUERY_CONTEXT_DEFAULT) .build()
.build()
), ),
"_j0.", "_j0.",
"(\"__time\" == \"_j0.a0\")", "(\"__time\" == \"_j0.a0\")",
@ -3626,6 +3628,10 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
{ {
cannotVectorize(); cannotVectorize();
Map<String, Object> minTimeQueryContext = new HashMap<>(queryContext);
minTimeQueryContext.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery( testQuery(
"SELECT dim1, COUNT(*) FROM foo\n" "SELECT dim1, COUNT(*) FROM foo\n"
+ "WHERE dim1 IN ('abc', 'def')\n" + "WHERE dim1 IN ('abc', 'def')\n"
@ -3641,13 +3647,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
join( join(
new TableDataSource(CalciteTests.DATASOURCE1), new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource( new QueryDataSource(
Druids.newTimeseriesQueryBuilder() Druids.newTimeBoundaryQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1) .dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity())) .intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL) .bound(TimeBoundaryQuery.MAX_TIME)
.aggregators(new LongMaxAggregatorFactory("a0", "__time")) .context(maxTimeQueryContext)
.context(QUERY_CONTEXT_DEFAULT) .build()
.build()
), ),
"j0.", "j0.",
"(\"__time\" == \"j0.a0\")", "(\"__time\" == \"j0.a0\")",
@ -3657,15 +3662,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
GroupByQuery.builder() GroupByQuery.builder()
.setDataSource( .setDataSource(
new QueryDataSource( new QueryDataSource(
Druids.newTimeseriesQueryBuilder() Druids.newTimeBoundaryQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1) .dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity())) .intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL) .bound(TimeBoundaryQuery.MIN_TIME)
.aggregators( .context(minTimeQueryContext)
new LongMinAggregatorFactory("a0", "__time") .build()
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
) )
) )
.setInterval(querySegmentSpec(Filtration.eternity())) .setInterval(querySegmentSpec(Filtration.eternity()))
@ -3730,6 +3732,10 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
{ {
cannotVectorize(); cannotVectorize();
Map<String, Object> minTimeQueryContext = new HashMap<>(queryContext);
minTimeQueryContext.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery( testQuery(
"SELECT dim1, COUNT(*) FROM\n" "SELECT dim1, COUNT(*) FROM\n"
+ "foo\n" + "foo\n"
@ -3745,26 +3751,24 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
join( join(
new TableDataSource(CalciteTests.DATASOURCE1), new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource( new QueryDataSource(
Druids.newTimeseriesQueryBuilder() Druids.newTimeBoundaryQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1) .dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity())) .intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL) .bound(TimeBoundaryQuery.MAX_TIME)
.aggregators(new LongMaxAggregatorFactory("a0", "__time")) .context(maxTimeQueryContext)
.context(QUERY_CONTEXT_DEFAULT) .build()
.build()
), ),
"j0.", "j0.",
"(\"__time\" == \"j0.a0\")", "(\"__time\" == \"j0.a0\")",
JoinType.INNER JoinType.INNER
), ),
new QueryDataSource( new QueryDataSource(
Druids.newTimeseriesQueryBuilder() Druids.newTimeBoundaryQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1) .dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity())) .intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL) .bound(TimeBoundaryQuery.MIN_TIME)
.aggregators(new LongMinAggregatorFactory("a0", "__time")) .context(minTimeQueryContext)
.context(QUERY_CONTEXT_DEFAULT) .build()
.build()
), ),
"_j0.", "_j0.",
"(\"__time\" == \"_j0.a0\")", "(\"__time\" == \"_j0.a0\")",

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.junit.Test;
import java.util.HashMap;
public class CalciteTimeBoundaryQueryTest extends BaseCalciteQueryTest
{
// __time for foo is [2000-01-01, 2000-01-02, 2000-01-03, 2001-01-01, 2001-01-02, 2001-01-03]
@Test
public void testMaxTimeQuery() throws Exception
{
HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
context.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
"SELECT MAX(__time) AS maxTime FROM foo",
ImmutableList.of(
Druids.newTimeBoundaryQueryBuilder()
.dataSource("foo")
.bound(TimeBoundaryQuery.MAX_TIME)
.context(context)
.build()
),
ImmutableList.of(new Object[]{DateTimes.of("2001-01-03").getMillis()})
);
}
@Test
public void testMinTimeQuery() throws Exception
{
HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
"SELECT MIN(__time) AS minTime FROM foo",
ImmutableList.of(
Druids.newTimeBoundaryQueryBuilder()
.dataSource("foo")
.bound(TimeBoundaryQuery.MIN_TIME)
.context(context)
.build()
),
ImmutableList.of(new Object[]{DateTimes.of("2000-01-01").getMillis()})
);
}
@Test
public void testMinTimeQueryWithFilters() throws Exception
{
HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
"SELECT MIN(__time) AS minTime FROM foo where __time >= '2001-01-01' and __time < '2003-01-01'",
ImmutableList.of(
Druids.newTimeBoundaryQueryBuilder()
.dataSource("foo")
.intervals(
new MultipleIntervalSegmentSpec(
ImmutableList.of(Intervals.of("2001-01-01T00:00:00.000Z/2003-01-01T00:00:00.000Z"))
)
)
.bound(TimeBoundaryQuery.MIN_TIME)
.context(context)
.build()
),
ImmutableList.of(new Object[]{DateTimes.of("2001-01-01").getMillis()})
);
}
// Currently, if both min(__time) and max(__time) are present, we don't convert it
// to a timeBoundary query. (ref : https://github.com/apache/druid/issues/12479)
@Test
public void testMinMaxTimeQuery() throws Exception
{
testQuery(
"SELECT MIN(__time) AS minTime, MAX(__time) as maxTime FROM foo",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.aggregators(
new LongMinAggregatorFactory("a0", "__time"),
new LongMaxAggregatorFactory("a1", "__time")
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(new Object[]{
DateTimes.of("2000-01-01").getMillis(),
DateTimes.of("2001-01-03").getMillis()
})
);
}
}