diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQuery.java index 8c328ca29ad..240ede642a1 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQuery.java @@ -48,6 +48,8 @@ public class TimeBoundaryQuery extends BaseQuery private static final QuerySegmentSpec DEFAULT_SEGMENT_SPEC = new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY); public static final String MAX_TIME = "maxTime"; public static final String MIN_TIME = "minTime"; + public static final String MAX_TIME_ARRAY_OUTPUT_NAME = "maxTimeArrayOutputName"; + public static final String MIN_TIME_ARRAY_OUTPUT_NAME = "minTimeArrayOutputName"; private static final byte CACHE_TYPE_ID = 0x0; diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index d6a402fcc83..9dc0859f4e2 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -42,6 +42,8 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.timeline.LogicalSegment; import java.nio.ByteBuffer; @@ -224,4 +226,40 @@ public class TimeBoundaryQueryQueryToolChest } }; } + + @Override + public RowSignature resultArraySignature(TimeBoundaryQuery query) + { + if (query.isMinTime() || query.isMaxTime()) { + RowSignature.Builder builder = RowSignature.builder(); + String outputName = query.isMinTime() ? + query.getContextValue(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, TimeBoundaryQuery.MIN_TIME) : + query.getContextValue(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, TimeBoundaryQuery.MAX_TIME); + return builder.add(outputName, ColumnType.LONG).build(); + } + return super.resultArraySignature(query); + } + + @Override + public Sequence resultsAsArrays( + TimeBoundaryQuery query, + Sequence> resultSequence + ) + { + if (query.isMaxTime()) { + return Sequences.map( + resultSequence, + result -> result == null || result.getValue() == null || result.getValue().getMaxTime() == null ? null : + new Object[]{result.getValue().getMaxTime().getMillis()} + ); + } else if (query.isMinTime()) { + return Sequences.map( + resultSequence, + result -> result == null || result.getValue() == null || result.getValue().getMinTime() == null ? null : + new Object[]{result.getValue().getMinTime().getMillis()} + ); + } else { + return super.resultsAsArrays(query, resultSequence); + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index 07775d45289..2fe21dafbf4 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -22,6 +22,7 @@ package org.apache.druid.query.timeboundary; import com.google.common.base.Function; import com.google.inject.Inject; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.BaseSequence; @@ -45,6 +46,7 @@ import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.filter.Filters; import org.joda.time.DateTime; +import org.joda.time.Interval; import java.util.Iterator; import java.util.List; @@ -155,7 +157,7 @@ public class TimeBoundaryQueryRunnerFactory final DateTime minTime; final DateTime maxTime; - if (legacyQuery.getFilter() != null) { + if (legacyQuery.getFilter() != null || !queryIntervalContainsAdapterInterval()) { minTime = getTimeBoundary(adapter, legacyQuery, false); if (minTime == null) { maxTime = null; @@ -183,6 +185,15 @@ public class TimeBoundaryQueryRunnerFactory { } + + private boolean queryIntervalContainsAdapterInterval() + { + List queryIntervals = legacyQuery.getQuerySegmentSpec().getIntervals(); + if (queryIntervals.size() != 1) { + throw new IAE("Should only have one interval, got[%s]", queryIntervals); + } + return queryIntervals.get(0).contains(adapter.getInterval()); + } } ); } diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java index 6ab886fe084..6a96f013605 100644 --- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java @@ -25,11 +25,14 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.Druids; import org.apache.druid.query.Result; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.timeline.LogicalSegment; import org.joda.time.Interval; import org.junit.Assert; @@ -289,6 +292,43 @@ public class TimeBoundaryQueryQueryToolChestTest Assert.assertEquals(7, segments.size()); } + @Test(expected = UOE.class) + public void testResultArraySignature() + { + TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() + .dataSource("testing") + .build(); + new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery); + } + + @Test + public void testResultArraySignatureWithMinTime() + { + TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() + .dataSource("testing") + .bound(TimeBoundaryQuery.MIN_TIME) + .context(ImmutableMap.of(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "foo")) + .build(); + RowSignature rowSignature = new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery); + RowSignature.Builder expectedRowSignatureBuilder = RowSignature.builder(); + expectedRowSignatureBuilder.add("foo", ColumnType.LONG); + Assert.assertEquals(expectedRowSignatureBuilder.build(), rowSignature); + } + + @Test + public void testResultArraySignatureWithMaxTime() + { + TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() + .dataSource("testing") + .bound(TimeBoundaryQuery.MAX_TIME) + .context(ImmutableMap.of(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "foo")) + .build(); + RowSignature rowSignature = new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery); + RowSignature.Builder expectedRowSignatureBuilder = RowSignature.builder(); + expectedRowSignatureBuilder.add("foo", ColumnType.LONG); + Assert.assertEquals(expectedRowSignatureBuilder.build(), rowSignature); + } + @Test public void testCacheStrategy() throws Exception { diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index e1ceab34de6..4b46bbcf879 100644 --- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -19,11 +19,14 @@ package org.apache.druid.query.timeboundary; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.io.CharSource; import org.apache.commons.lang.StringUtils; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryPlus; @@ -35,6 +38,7 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.query.context.ConcurrentResponseContext; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.ordering.StringComparators; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; @@ -47,6 +51,7 @@ import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.SingleElementPartitionChunk; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -183,6 +188,32 @@ public class TimeBoundaryQueryRunnerTest Assert.assertEquals(DateTimes.of("2011-01-16T00:00:00.000Z"), maxTime); } + @Test + @SuppressWarnings("unchecked") + public void testTimeFilteredTimeBoundaryQuery() throws IOException + { + QueryRunner customRunner = getCustomRunner(); + TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() + .dataSource("testing") + .intervals( + new MultipleIntervalSegmentSpec( + ImmutableList.of(Intervals.of("2011-01-15T00:00:00.000Z/2011-01-16T00:00:00.000Z")) + ) + ) + .build(); + List> results = + customRunner.run(QueryPlus.wrap(timeBoundaryQuery)).toList(); + + Assert.assertTrue(Iterables.size(results) > 0); + + TimeBoundaryResultValue val = results.iterator().next().getValue(); + DateTime minTime = val.getMinTime(); + DateTime maxTime = val.getMaxTime(); + + Assert.assertEquals(DateTimes.of("2011-01-15T00:00:00.000Z"), minTime); + Assert.assertEquals(DateTimes.of("2011-01-15T01:00:00.000Z"), maxTime); + } + @Test @SuppressWarnings("unchecked") public void testFilteredTimeBoundaryQueryNoMatches() throws IOException @@ -216,6 +247,22 @@ public class TimeBoundaryQueryRunnerTest Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime); } + @Test(expected = UOE.class) + @SuppressWarnings("unchecked") + public void testTimeBoundaryArrayResults() + { + TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() + .dataSource("testing") + .bound(null) + .build(); + ResponseContext context = ConcurrentResponseContext.createEmpty(); + context.initializeMissingSegments(); + new TimeBoundaryQueryQueryToolChest().resultsAsArrays( + timeBoundaryQuery, + runner.run(QueryPlus.wrap(timeBoundaryQuery), context) + ).toList(); + } + @Test @SuppressWarnings("unchecked") public void testTimeBoundaryMax() @@ -235,6 +282,26 @@ public class TimeBoundaryQueryRunnerTest Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime); } + @Test + @SuppressWarnings("unchecked") + public void testTimeBoundaryMaxArraysResults() + { + TimeBoundaryQuery maxTimeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() + .dataSource("testing") + .bound(TimeBoundaryQuery.MAX_TIME) + .build(); + ResponseContext context = ConcurrentResponseContext.createEmpty(); + context.initializeMissingSegments(); + List maxTime = new TimeBoundaryQueryQueryToolChest().resultsAsArrays( + maxTimeBoundaryQuery, + runner.run(QueryPlus.wrap(maxTimeBoundaryQuery), context) + ).toList(); + + Long maxTimeMillis = (Long) maxTime.get(0)[0]; + Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), new DateTime(maxTimeMillis, DateTimeZone.UTC)); + Assert.assertEquals(1, maxTime.size()); + } + @Test @SuppressWarnings("unchecked") public void testTimeBoundaryMin() @@ -254,6 +321,26 @@ public class TimeBoundaryQueryRunnerTest Assert.assertNull(maxTime); } + @Test + @SuppressWarnings("unchecked") + public void testTimeBoundaryMinArraysResults() + { + TimeBoundaryQuery minTimeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() + .dataSource("testing") + .bound(TimeBoundaryQuery.MIN_TIME) + .build(); + ResponseContext context = ConcurrentResponseContext.createEmpty(); + context.initializeMissingSegments(); + List minTime = new TimeBoundaryQueryQueryToolChest().resultsAsArrays( + minTimeBoundaryQuery, + runner.run(QueryPlus.wrap(minTimeBoundaryQuery), context) + ).toList(); + + Long minTimeMillis = (Long) minTime.get(0)[0]; + Assert.assertEquals(DateTimes.of("2011-01-12T00:00:00.000Z"), new DateTime(minTimeMillis, DateTimeZone.UTC)); + Assert.assertEquals(1, minTime.size()); + } + @Test public void testMergeResults() { diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 9e7c234c0c6..0dfe0d53620 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -55,6 +55,8 @@ import org.apache.druid.query.scan.ScanQueryConfig; import org.apache.druid.query.scan.ScanQueryEngine; import org.apache.druid.query.scan.ScanQueryQueryToolChest; import org.apache.druid.query.scan.ScanQueryRunnerFactory; +import org.apache.druid.query.timeboundary.TimeBoundaryQuery; +import org.apache.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; @@ -344,6 +346,7 @@ public class QueryStackTests ) ) .put(GroupByQuery.class, groupByQueryRunnerFactory) + .put(TimeBoundaryQuery.class, new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER)) .build() ); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 98f2dfe0450..f9f5083f17b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -49,7 +49,11 @@ import org.apache.druid.query.DataSource; import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; +import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.SimpleLongAggregatorFactory; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.groupby.GroupByQuery; @@ -59,6 +63,7 @@ import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.timeboundary.TimeBoundaryQuery; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.topn.DimensionTopNMetricSpec; import org.apache.druid.query.topn.InvertedTopNMetricSpec; @@ -799,6 +804,11 @@ public class DruidQuery } } + final TimeBoundaryQuery timeBoundaryQuery = toTimeBoundaryQuery(); + if (timeBoundaryQuery != null) { + return timeBoundaryQuery; + } + final TimeseriesQuery tsQuery = toTimeseriesQuery(queryFeatureInspector); if (tsQuery != null) { return tsQuery; @@ -822,6 +832,69 @@ public class DruidQuery throw new CannotBuildQueryException("Cannot convert query parts into an actual query"); } + /** + * Return this query as a TimeBoundary query, or null if this query is not compatible with Timeseries. + * + * @return a TimeBoundaryQuery if possible. null if it is not possible to construct one. + */ + @Nullable + private TimeBoundaryQuery toTimeBoundaryQuery() + { + if (grouping == null + || grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs()) + || grouping.getHavingFilter() != null + || selectProjection != null) { + return null; + } + + if (sorting != null && sorting.getOffsetLimit().hasOffset()) { + // Timeboundary cannot handle offsets. + return null; + } + + if (grouping.getDimensions().isEmpty() && + grouping.getPostAggregators().isEmpty() && + grouping.getAggregatorFactories().size() == 1) { // currently only handles max(__time) or min(__time) not both + boolean minTime; + AggregatorFactory aggregatorFactory = Iterables.getOnlyElement(grouping.getAggregatorFactories()); + if (aggregatorFactory instanceof LongMaxAggregatorFactory || + aggregatorFactory instanceof LongMinAggregatorFactory) { + SimpleLongAggregatorFactory minMaxFactory = (SimpleLongAggregatorFactory) aggregatorFactory; + String fieldName = minMaxFactory.getFieldName(); + if (fieldName == null || + !fieldName.equals(ColumnHolder.TIME_COLUMN_NAME) || + (minMaxFactory.getExpression() != null && !minMaxFactory.getExpression().isEmpty())) { + return null; + } + minTime = aggregatorFactory instanceof LongMinAggregatorFactory; + } else { + return null; + } + final Pair dataSourceFiltrationPair = getFiltration( + dataSource, + filter, + virtualColumnRegistry + ); + final DataSource newDataSource = dataSourceFiltrationPair.lhs; + final Filtration filtration = dataSourceFiltrationPair.rhs; + String bound = minTime ? TimeBoundaryQuery.MIN_TIME : TimeBoundaryQuery.MAX_TIME; + HashMap context = new HashMap<>(plannerContext.getQueryContext().getMergedParams()); + if (minTime) { + context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, aggregatorFactory.getName()); + } else { + context.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, aggregatorFactory.getName()); + } + return new TimeBoundaryQuery( + newDataSource, + filtration.getQuerySegmentSpec(), + bound, + filtration.getDimFilter(), + context + ); + } + return null; + } + /** * Return this query as a Timeseries query, or null if this query is not compatible with Timeseries. * diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index 2d5b3b043dc..1c4227102f4 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -72,6 +72,7 @@ import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.timeboundary.TimeBoundaryQuery; import org.apache.druid.query.topn.DimensionTopNMetricSpec; import org.apache.druid.query.topn.InvertedTopNMetricSpec; import org.apache.druid.query.topn.NumericTopNMetricSpec; @@ -2387,6 +2388,8 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest cannotVectorize(); } + Map maxTimeQueryContext = new HashMap<>(queryContext); + maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0"); testQuery( "SELECT DISTINCT __time FROM druid.foo WHERE __time IN (SELECT MAX(__time) FROM druid.foo)", queryContext, @@ -2396,14 +2399,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest join( new TableDataSource(CalciteTests.DATASOURCE1), new QueryDataSource( - Druids.newTimeseriesQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .granularity(Granularities.ALL) - .aggregators(new LongMaxAggregatorFactory("a0", "__time")) - .context(QUERY_CONTEXT_DEFAULT) - .build() - .withOverriddenContext(queryContext) + Druids.newTimeBoundaryQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .bound(TimeBoundaryQuery.MAX_TIME) + .context(maxTimeQueryContext) + .build() ), "j0.", equalsCondition( @@ -2433,6 +2434,8 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest // Cannot vectorize JOIN operator. cannotVectorize(); + Map maxTimeQueryContext = new HashMap<>(queryContext); + maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0"); testQuery( "SELECT DISTINCT __time FROM druid.foo WHERE __time NOT IN (SELECT MAX(__time) FROM druid.foo)", queryContext, @@ -2446,13 +2449,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest GroupByQuery .builder() .setDataSource( - Druids.newTimeseriesQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .granularity(Granularities.ALL) - .aggregators(new LongMaxAggregatorFactory("a0", "__time")) - .context(QUERY_CONTEXT_DEFAULT) - .build() + Druids.newTimeBoundaryQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .bound(TimeBoundaryQuery.MAX_TIME) + .context(maxTimeQueryContext) + .build() ) .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) @@ -3566,6 +3568,8 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest cannotVectorize(); } + Map maxTimeQueryContext = new HashMap<>(queryContext); + maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0"); testQuery( "SELECT dim1, COUNT(*) FROM foo\n" + "WHERE dim1 IN ('abc', 'def')" @@ -3580,28 +3584,26 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest join( new TableDataSource(CalciteTests.DATASOURCE1), new QueryDataSource( - Druids.newTimeseriesQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .granularity(Granularities.ALL) - .filters(selector("cnt", "1", null)) - .aggregators(new LongMaxAggregatorFactory("a0", "__time")) - .context(QUERY_CONTEXT_DEFAULT) - .build() + Druids.newTimeBoundaryQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .bound(TimeBoundaryQuery.MAX_TIME) + .filters(selector("cnt", "1", null)) + .context(maxTimeQueryContext) + .build() ), "j0.", "(\"__time\" == \"j0.a0\")", JoinType.INNER ), new QueryDataSource( - Druids.newTimeseriesQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .granularity(Granularities.ALL) - .filters(not(selector("cnt", "2", null))) - .aggregators(new LongMaxAggregatorFactory("a0", "__time")) - .context(QUERY_CONTEXT_DEFAULT) - .build() + Druids.newTimeBoundaryQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .bound(TimeBoundaryQuery.MAX_TIME) + .filters(not(selector("cnt", "2", null))) + .context(maxTimeQueryContext) + .build() ), "_j0.", "(\"__time\" == \"_j0.a0\")", @@ -3626,6 +3628,10 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest { cannotVectorize(); + Map minTimeQueryContext = new HashMap<>(queryContext); + minTimeQueryContext.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0"); + Map maxTimeQueryContext = new HashMap<>(queryContext); + maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0"); testQuery( "SELECT dim1, COUNT(*) FROM foo\n" + "WHERE dim1 IN ('abc', 'def')\n" @@ -3641,13 +3647,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest join( new TableDataSource(CalciteTests.DATASOURCE1), new QueryDataSource( - Druids.newTimeseriesQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .granularity(Granularities.ALL) - .aggregators(new LongMaxAggregatorFactory("a0", "__time")) - .context(QUERY_CONTEXT_DEFAULT) - .build() + Druids.newTimeBoundaryQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .bound(TimeBoundaryQuery.MAX_TIME) + .context(maxTimeQueryContext) + .build() ), "j0.", "(\"__time\" == \"j0.a0\")", @@ -3657,15 +3662,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest GroupByQuery.builder() .setDataSource( new QueryDataSource( - Druids.newTimeseriesQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .granularity(Granularities.ALL) - .aggregators( - new LongMinAggregatorFactory("a0", "__time") - ) - .context(QUERY_CONTEXT_DEFAULT) - .build() + Druids.newTimeBoundaryQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .bound(TimeBoundaryQuery.MIN_TIME) + .context(minTimeQueryContext) + .build() ) ) .setInterval(querySegmentSpec(Filtration.eternity())) @@ -3730,6 +3732,10 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest { cannotVectorize(); + Map minTimeQueryContext = new HashMap<>(queryContext); + minTimeQueryContext.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0"); + Map maxTimeQueryContext = new HashMap<>(queryContext); + maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0"); testQuery( "SELECT dim1, COUNT(*) FROM\n" + "foo\n" @@ -3745,26 +3751,24 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest join( new TableDataSource(CalciteTests.DATASOURCE1), new QueryDataSource( - Druids.newTimeseriesQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .granularity(Granularities.ALL) - .aggregators(new LongMaxAggregatorFactory("a0", "__time")) - .context(QUERY_CONTEXT_DEFAULT) - .build() + Druids.newTimeBoundaryQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .bound(TimeBoundaryQuery.MAX_TIME) + .context(maxTimeQueryContext) + .build() ), "j0.", "(\"__time\" == \"j0.a0\")", JoinType.INNER ), new QueryDataSource( - Druids.newTimeseriesQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .granularity(Granularities.ALL) - .aggregators(new LongMinAggregatorFactory("a0", "__time")) - .context(QUERY_CONTEXT_DEFAULT) - .build() + Druids.newTimeBoundaryQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .bound(TimeBoundaryQuery.MIN_TIME) + .context(minTimeQueryContext) + .build() ), "_j0.", "(\"__time\" == \"_j0.a0\")", diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java new file mode 100644 index 00000000000..74e42c53654 --- /dev/null +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.Druids; +import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; +import org.apache.druid.query.aggregation.LongMinAggregatorFactory; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.timeboundary.TimeBoundaryQuery; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.junit.Test; + +import java.util.HashMap; + +public class CalciteTimeBoundaryQueryTest extends BaseCalciteQueryTest +{ + // __time for foo is [2000-01-01, 2000-01-02, 2000-01-03, 2001-01-01, 2001-01-02, 2001-01-03] + @Test + public void testMaxTimeQuery() throws Exception + { + HashMap context = new HashMap<>(QUERY_CONTEXT_DEFAULT); + context.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0"); + testQuery( + "SELECT MAX(__time) AS maxTime FROM foo", + ImmutableList.of( + Druids.newTimeBoundaryQueryBuilder() + .dataSource("foo") + .bound(TimeBoundaryQuery.MAX_TIME) + .context(context) + .build() + ), + ImmutableList.of(new Object[]{DateTimes.of("2001-01-03").getMillis()}) + ); + } + + @Test + public void testMinTimeQuery() throws Exception + { + HashMap context = new HashMap<>(QUERY_CONTEXT_DEFAULT); + context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0"); + testQuery( + "SELECT MIN(__time) AS minTime FROM foo", + ImmutableList.of( + Druids.newTimeBoundaryQueryBuilder() + .dataSource("foo") + .bound(TimeBoundaryQuery.MIN_TIME) + .context(context) + .build() + ), + ImmutableList.of(new Object[]{DateTimes.of("2000-01-01").getMillis()}) + ); + } + + @Test + public void testMinTimeQueryWithFilters() throws Exception + { + HashMap context = new HashMap<>(QUERY_CONTEXT_DEFAULT); + context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0"); + testQuery( + "SELECT MIN(__time) AS minTime FROM foo where __time >= '2001-01-01' and __time < '2003-01-01'", + ImmutableList.of( + Druids.newTimeBoundaryQueryBuilder() + .dataSource("foo") + .intervals( + new MultipleIntervalSegmentSpec( + ImmutableList.of(Intervals.of("2001-01-01T00:00:00.000Z/2003-01-01T00:00:00.000Z")) + ) + ) + .bound(TimeBoundaryQuery.MIN_TIME) + .context(context) + .build() + ), + ImmutableList.of(new Object[]{DateTimes.of("2001-01-01").getMillis()}) + ); + } + + // Currently, if both min(__time) and max(__time) are present, we don't convert it + // to a timeBoundary query. (ref : https://github.com/apache/druid/issues/12479) + @Test + public void testMinMaxTimeQuery() throws Exception + { + testQuery( + "SELECT MIN(__time) AS minTime, MAX(__time) as maxTime FROM foo", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + .aggregators( + new LongMinAggregatorFactory("a0", "__time"), + new LongMaxAggregatorFactory("a1", "__time") + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of(new Object[]{ + DateTimes.of("2000-01-01").getMillis(), + DateTimes.of("2001-01-03").getMillis() + }) + ); + } +}