diff --git a/integration-tests/src/test/resources/queries/twitterstream_queries.json b/integration-tests/src/test/resources/queries/twitterstream_queries.json index cdd4057eb57..104b07ba47a 100644 --- a/integration-tests/src/test/resources/queries/twitterstream_queries.json +++ b/integration-tests/src/test/resources/queries/twitterstream_queries.json @@ -94,7 +94,8 @@ "context": { "useCache": "true", "populateCache": "true", - "timeout": 60000 + "timeout": 60000, + "useTopNMultiPassPooledQueryGranularity": "true" } }, "expectedResults": [ @@ -198,7 +199,8 @@ "context": { "useCache": "true", "populateCache": "true", - "timeout": 60000 + "timeout": 60000, + "useTopNMultiPassPooledQueryGranularity": "true" } }, "expectedResults": [ @@ -322,7 +324,8 @@ "context": { "useCache": "true", "populateCache": "true", - "timeout": 60000 + "timeout": 60000, + "useTopNMultiPassPooledQueryGranularity": "true" } }, "expectedResults": [ @@ -741,7 +744,8 @@ "context": { "useCache": "true", "populateCache": "true", - "timeout": 60000 + "timeout": 60000, + "useTopNMultiPassPooledQueryGranularity": "true" } }, "expectedResults": [ diff --git a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java index 6bc387183ba..ffdb5bd15fa 100644 --- a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java +++ b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.segment.ColumnValueSelector; @@ -94,11 +95,13 @@ public class CursorGranularizer timeSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME); } - return new CursorGranularizer(cursor, bucketIterable, timeSelector, timeOrder == Order.DESCENDING); + return new CursorGranularizer(cursor, granularity, bucketIterable, timeSelector, timeOrder == Order.DESCENDING); } private final Cursor cursor; + private final Granularity granularity; + // Iterable that iterates over time buckets. private final Iterable bucketIterable; @@ -112,12 +115,14 @@ public class CursorGranularizer private CursorGranularizer( Cursor cursor, + Granularity granularity, Iterable bucketIterable, @Nullable ColumnValueSelector timeSelector, boolean descending ) { this.cursor = cursor; + this.granularity = granularity; this.bucketIterable = bucketIterable; this.timeSelector = timeSelector; this.descending = descending; @@ -133,13 +138,18 @@ public class CursorGranularizer return DateTimes.utc(currentBucketStart); } + public Interval getCurrentInterval() + { + return Intervals.utc(currentBucketStart, currentBucketEnd); + } + public boolean advanceToBucket(final Interval bucketInterval) { + currentBucketStart = bucketInterval.getStartMillis(); + currentBucketEnd = bucketInterval.getEndMillis(); if (cursor.isDone()) { return false; } - currentBucketStart = bucketInterval.getStartMillis(); - currentBucketEnd = bucketInterval.getEndMillis(); if (timeSelector == null) { return true; } diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index 74b45023d04..a2a81c6eb74 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -89,6 +89,12 @@ public class QueryContexts public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit"; public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold"; public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled"; + // this flag controls whether the topN engine can use the 'pooled' algorithm when query granularity is set to + // anything other than 'ALL' and the cardinality + number of aggregators would require more size than is available + // in the buffers and so must reset the cursor to use multiple passes. This is likely slower than the default + // behavior of falling back to heap memory, but less dangerous since too large of a query can cause the heap to run + // out of memory + public static final String TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY = "useTopNMultiPassPooledQueryGranularity"; /** * Context parameter to enable/disable the extended filtered sum rewrite logic. * diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java index 8a1f142cd14..35d958f7ad1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java @@ -391,7 +391,7 @@ public class GroupByQueryEngine if (delegate != null && delegate.hasNext()) { return true; } else { - if (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { + if (granularizer.currentOffsetWithinBucket()) { if (delegate != null) { delegate.close(); } diff --git a/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java index a4605c3f265..b51b79da049 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java @@ -113,6 +113,7 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm= 0) { - aggregator.aggregate(resultsBuffer, position); - } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) { - positions[dimIndex] = positionToAllocate; - position = positionToAllocate; - aggregator.init(resultsBuffer, position); - aggregator.aggregate(resultsBuffer, position); - positionToAllocate += aggregatorSize; + if (granularizer.currentOffsetWithinBucket()) { + while (!cursor.isDoneOrInterrupted()) { + final IndexedInts dimValues = dimensionSelector.getRow(); + final int dimSize = dimValues.size(); + for (int i = 0; i < dimSize; i++) { + int dimIndex = dimValues.get(i); + int position = positions[dimIndex]; + if (position >= 0) { + aggregator.aggregate(resultsBuffer, position); + } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) { + positions[dimIndex] = positionToAllocate; + position = positionToAllocate; + aggregator.init(resultsBuffer, position); + aggregator.aggregate(resultsBuffer, position); + positionToAllocate += aggregatorSize; + } + } + processedRows++; + if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { + break; } - } - processedRows++; - if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { - break; } } return processedRows; diff --git a/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java b/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java index 4de281d8a0b..1e221992b02 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java +++ b/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java @@ -57,29 +57,31 @@ public final class Generic2AggPooledTopNScannerPrototype implements Generic2AggP int totalAggregatorsSize = aggregator1Size + aggregator2Size; long processedRows = 0; int positionToAllocate = 0; - while (!cursor.isDoneOrInterrupted()) { - final IndexedInts dimValues = dimensionSelector.getRow(); - final int dimSize = dimValues.size(); - for (int i = 0; i < dimSize; i++) { - int dimIndex = dimValues.get(i); - int position = positions[dimIndex]; - if (position >= 0) { - aggregator1.aggregate(resultsBuffer, position); - aggregator2.aggregate(resultsBuffer, position + aggregator1Size); - } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) { - positions[dimIndex] = positionToAllocate; - position = positionToAllocate; - aggregator1.init(resultsBuffer, position); - aggregator1.aggregate(resultsBuffer, position); - position += aggregator1Size; - aggregator2.init(resultsBuffer, position); - aggregator2.aggregate(resultsBuffer, position); - positionToAllocate += totalAggregatorsSize; + if (granularizer.currentOffsetWithinBucket()) { + while (!cursor.isDoneOrInterrupted()) { + final IndexedInts dimValues = dimensionSelector.getRow(); + final int dimSize = dimValues.size(); + for (int i = 0; i < dimSize; i++) { + int dimIndex = dimValues.get(i); + int position = positions[dimIndex]; + if (position >= 0) { + aggregator1.aggregate(resultsBuffer, position); + aggregator2.aggregate(resultsBuffer, position + aggregator1Size); + } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) { + positions[dimIndex] = positionToAllocate; + position = positionToAllocate; + aggregator1.init(resultsBuffer, position); + aggregator1.aggregate(resultsBuffer, position); + position += aggregator1Size; + aggregator2.init(resultsBuffer, position); + aggregator2.aggregate(resultsBuffer, position); + positionToAllocate += totalAggregatorsSize; + } + } + processedRows++; + if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { + break; } - } - processedRows++; - if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { - break; } } return processedRows; diff --git a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java index 9dfc9ee3ded..f31b6c1e656 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java @@ -477,13 +477,105 @@ public class PooledTopNAlgorithm final int aggExtra = aggSize % AGG_UNROLL_COUNT; int currentPosition = 0; long processedRows = 0; - while (!cursor.isDoneOrInterrupted()) { - final IndexedInts dimValues = dimSelector.getRow(); + if (granularizer.currentOffsetWithinBucket()) { + while (!cursor.isDoneOrInterrupted()) { + final IndexedInts dimValues = dimSelector.getRow(); - final int dimSize = dimValues.size(); - final int dimExtra = dimSize % AGG_UNROLL_COUNT; - switch (dimExtra) { - case 7: + final int dimSize = dimValues.size(); + final int dimExtra = dimSize % AGG_UNROLL_COUNT; + switch (dimExtra) { + case 7: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(6), + currentPosition + ); + // fall through + case 6: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(5), + currentPosition + ); + // fall through + case 5: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(4), + currentPosition + ); + // fall through + case 4: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(3), + currentPosition + ); + // fall through + case 3: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(2), + currentPosition + ); + // fall through + case 2: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(1), + currentPosition + ); + // fall through + case 1: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(0), + currentPosition + ); + } + for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) { currentPosition = aggregateDimValue( positions, theAggregators, @@ -492,11 +584,9 @@ public class PooledTopNAlgorithm aggregatorOffsets, aggSize, aggExtra, - dimValues.get(6), + dimValues.get(i), currentPosition ); - // fall through - case 6: currentPosition = aggregateDimValue( positions, theAggregators, @@ -505,11 +595,9 @@ public class PooledTopNAlgorithm aggregatorOffsets, aggSize, aggExtra, - dimValues.get(5), + dimValues.get(i + 1), currentPosition ); - // fall through - case 5: currentPosition = aggregateDimValue( positions, theAggregators, @@ -518,11 +606,9 @@ public class PooledTopNAlgorithm aggregatorOffsets, aggSize, aggExtra, - dimValues.get(4), + dimValues.get(i + 2), currentPosition ); - // fall through - case 4: currentPosition = aggregateDimValue( positions, theAggregators, @@ -531,11 +617,9 @@ public class PooledTopNAlgorithm aggregatorOffsets, aggSize, aggExtra, - dimValues.get(3), + dimValues.get(i + 3), currentPosition ); - // fall through - case 3: currentPosition = aggregateDimValue( positions, theAggregators, @@ -544,11 +628,9 @@ public class PooledTopNAlgorithm aggregatorOffsets, aggSize, aggExtra, - dimValues.get(2), + dimValues.get(i + 4), currentPosition ); - // fall through - case 2: currentPosition = aggregateDimValue( positions, theAggregators, @@ -557,11 +639,9 @@ public class PooledTopNAlgorithm aggregatorOffsets, aggSize, aggExtra, - dimValues.get(1), + dimValues.get(i + 5), currentPosition ); - // fall through - case 1: currentPosition = aggregateDimValue( positions, theAggregators, @@ -570,103 +650,25 @@ public class PooledTopNAlgorithm aggregatorOffsets, aggSize, aggExtra, - dimValues.get(0), + dimValues.get(i + 6), currentPosition ); - } - for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) { - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 1), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 2), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 3), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 4), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 5), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 6), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 7), - currentPosition - ); - } - processedRows++; - if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { - break; + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(i + 7), + currentPosition + ); + } + processedRows++; + if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { + break; + } } } return processedRows; diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java index 0c79e7c8d31..b6079a29f29 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java @@ -93,20 +93,22 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm makeAggregators(cursor, query.getAggregatorSpecs()) - ); + Aggregator[] theAggregators = aggregatesStore.computeIfAbsent( + key, + k -> makeAggregators(cursor, query.getAggregatorSpecs()) + ); - for (Aggregator aggregator : theAggregators) { - aggregator.aggregate(); - } - processedRows++; - if (!granularizer.advanceCursorWithinBucket()) { - break; + for (Aggregator aggregator : theAggregators) { + aggregator.aggregate(); + } + processedRows++; + if (!granularizer.advanceCursorWithinBucket()) { + break; + } } } return processedRows; diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index 1382c9aaa4b..c8aa3292158 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.CursorGranularizer; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -245,6 +246,11 @@ public class TopNQueryEngine final int numBytesPerRecord ) { + if (cardinality < 0) { + // unknown cardinality doesn't work with the pooled algorithm which requires an exact count of dictionary ids + return false; + } + if (selector.isHasExtractionFn()) { // extraction functions can have a many to one mapping, and should use a heap algorithm return false; @@ -254,28 +260,35 @@ public class TopNQueryEngine // non-string output cannot use the pooled algorith, even if the underlying selector supports it return false; } + if (!Types.is(capabilities, ValueType.STRING)) { // non-strings are not eligible to use the pooled algorithm, and should use a heap algorithm return false; } - // string columns must use the on heap algorithm unless they have the following capabilites if (!capabilities.isDictionaryEncoded().isTrue() || !capabilities.areDictionaryValuesUnique().isTrue()) { + // string columns must use the on heap algorithm unless they have the following capabilites return false; } - if (Granularities.ALL.equals(query.getGranularity())) { - // all other requirements have been satisfied, ALL granularity can always use the pooled algorithms - return true; - } - // if not using ALL granularity, we can still potentially use the pooled algorithm if we are certain it doesn't - // need to make multiple passes (e.g. reset the cursor) + + // num values per pass must be greater than 0 or else the pooled algorithm cannot progress try (final ResourceHolder resultsBufHolder = bufferPool.take()) { final ByteBuffer resultsBuf = resultsBufHolder.get(); final int numBytesToWorkWith = resultsBuf.capacity(); final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith / numBytesPerRecord : cardinality; - return numValuesPerPass <= cardinality; + final boolean allowMultiPassPooled = query.context().getBoolean( + QueryContexts.TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY, + false + ); + if (Granularities.ALL.equals(query.getGranularity()) || allowMultiPassPooled) { + return numValuesPerPass > 0; + } + + // if not using multi-pass for pooled + query granularity other than 'ALL', we must check that all values can fit + // in a single pass + return numValuesPerPass >= cardinality; } } diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java index 565ad036cea..41709e35497 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java @@ -94,23 +94,25 @@ public abstract class NullableNumericTopNColumnAggregatesProcessor BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) - ); - rowSelector[dimIndex] = aggs; - } + if (granularizer.currentOffsetWithinBucket()) { + while (!cursor.isDone()) { + final IndexedInts dimValues = selector.getRow(); + for (int i = 0, size = dimValues.size(); i < size; ++i) { + final int dimIndex = dimValues.get(i); + Aggregator[] aggs = rowSelector[dimIndex]; + if (aggs == null) { + final Object key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); + aggs = aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); + rowSelector[dimIndex] = aggs; + } - for (Aggregator aggregator : aggs) { - aggregator.aggregate(); + for (Aggregator aggregator : aggs) { + aggregator.aggregate(); + } + } + processedRows++; + if (!granularizer.advanceCursorWithinBucket()) { + break; } - } - processedRows++; - if (!granularizer.advanceCursorWithinBucket()) { - break; } } return processedRows; @@ -192,22 +194,24 @@ public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregates ) { long processedRows = 0; - while (!cursor.isDone()) { - final IndexedInts dimValues = selector.getRow(); - for (int i = 0, size = dimValues.size(); i < size; ++i) { - final int dimIndex = dimValues.get(i); - final Object key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); - Aggregator[] aggs = aggregatesStore.computeIfAbsent( - key, - k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) - ); - for (Aggregator aggregator : aggs) { - aggregator.aggregate(); + if (granularizer.currentOffsetWithinBucket()) { + while (!cursor.isDone()) { + final IndexedInts dimValues = selector.getRow(); + for (int i = 0, size = dimValues.size(); i < size; ++i) { + final int dimIndex = dimValues.get(i); + final Object key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); + Aggregator[] aggs = aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); + for (Aggregator aggregator : aggs) { + aggregator.aggregate(); + } + } + processedRows++; + if (!granularizer.advanceCursorWithinBucket()) { + break; } - } - processedRows++; - if (!granularizer.advanceCursorWithinBucket()) { - break; } } return processedRows; diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java index 42d8a72a0a0..c3a0982d371 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java @@ -60,6 +60,7 @@ import org.apache.druid.query.aggregation.FilteredAggregatorFactory; import org.apache.druid.query.aggregation.FloatMaxAggregatorFactory; import org.apache.druid.query.aggregation.FloatMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory; import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.first.FloatFirstAggregatorFactory; @@ -6378,6 +6379,928 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest assertExpectedResults(expectedResults, query); } + + @Test + public void testTopN_time_granularity_empty_buckets() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators(QueryRunnerTestHelper.INDEX_LONG_SUM) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "index", 2836L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "index", 2681L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "index", 1102L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "index", 2514L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "index", 2193L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "index", 1120L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopN_time_granularity_empty_buckets_expression() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .virtualColumns( + new ExpressionVirtualColumn( + "vc", + "market + ' ' + placement", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + ) + ) + .dimension("vc") + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators(QueryRunnerTestHelper.INDEX_LONG_SUM) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + "vc", "total_market preferred", + "index", 2836L + ), + ImmutableMap.of( + "vc", "upfront preferred", + "index", 2681L + ), + ImmutableMap.of( + "vc", "spot preferred", + "index", 1102L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + "vc", "total_market preferred", + "index", 2514L + ), + ImmutableMap.of( + "vc", "upfront preferred", + "index", 2193L + ), + ImmutableMap.of( + "vc", "spot preferred", + "index", 1120L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopN_time_granularity_empty_buckets_2pool() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators( + QueryRunnerTestHelper.INDEX_LONG_SUM, + QueryRunnerTestHelper.INDEX_DOUBLE_MAX + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "index", 2836L, + "doubleMaxIndex", 1522.043733 + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "index", 2681L, + "doubleMaxIndex", 1447.34116 + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "index", 1102L, + "doubleMaxIndex", 158.747224 + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "index", 2514L, + "doubleMaxIndex", 1321.375057 + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "index", 2193L, + "doubleMaxIndex", 1144.342401 + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "index", 1120L, + "doubleMaxIndex", 166.016049 + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopN_time_granularity_empty_buckets_timeExtract() + { + // this is pretty wierd to have both query granularity and a time extractionFn... but it is not explicitly + // forbidden so might as well test it + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension( + new ExtractionDimensionSpec( + ColumnHolder.TIME_COLUMN_NAME, + "dayOfWeek", + new TimeFormatExtractionFn("EEEE", null, null, null, false) + ) + ) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators( + QueryRunnerTestHelper.INDEX_LONG_SUM + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Collections.singletonList( + ImmutableMap.of( + "dayOfWeek", "Friday", + "index", 6619L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Collections.singletonList( + ImmutableMap.of( + "dayOfWeek", "Saturday", + "index", 5827L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopN_time_granularity_empty_buckets_numeric() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(new DefaultDimensionSpec("qualityLong", "qualityLong", ColumnType.LONG)) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators( + QueryRunnerTestHelper.INDEX_LONG_SUM + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.asList( + ImmutableMap.of( + "qualityLong", 1600L, + "index", 2900L + ), + ImmutableMap.of( + "qualityLong", 1400L, + "index", 2870L + ), + ImmutableMap.of( + "qualityLong", 1200L, + "index", 158L + ), + ImmutableMap.of( + "qualityLong", 1000L, + "index", 135L + ), + ImmutableMap.of( + "qualityLong", 1500L, + "index", 121L + ), + ImmutableMap.of( + "qualityLong", 1300L, + "index", 120L + ), + ImmutableMap.of( + "qualityLong", 1800L, + "index", 119L + ), + ImmutableMap.of( + "qualityLong", 1100L, + "index", 118L + ), + ImmutableMap.of( + "qualityLong", 1700L, + "index", 78L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.asList( + ImmutableMap.of( + "qualityLong", 1600L, + "index", 2505L + ), + ImmutableMap.of( + "qualityLong", 1400L, + "index", 2447L + ), + ImmutableMap.of( + "qualityLong", 1200L, + "index", 166L + ), + ImmutableMap.of( + "qualityLong", 1000L, + "index", 147L + ), + ImmutableMap.of( + "qualityLong", 1800L, + "index", 126L + ), + ImmutableMap.of( + "qualityLong", 1500L, + "index", 114L + ), + ImmutableMap.of( + "qualityLong", 1300L, + "index", 113L + ), + ImmutableMap.of( + "qualityLong", 1100L, + "index", 112L + ), + ImmutableMap.of( + "qualityLong", 1700L, + "index", 97L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopN_time_granularity_multipass_no_pool() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators( + QueryRunnerTestHelper.INDEX_LONG_SUM, + new StringAnyAggregatorFactory("big", QueryRunnerTestHelper.PLACEMENT_DIMENSION, 4000000, null) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2836L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2681L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1102L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2514L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2193L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1120L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopN_time_granularity_multipass_with_pooled() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .context(ImmutableMap.of(QueryContexts.TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY, true)) + .aggregators( + QueryRunnerTestHelper.INDEX_LONG_SUM, + new StringAnyAggregatorFactory("big", QueryRunnerTestHelper.PLACEMENT_DIMENSION, 4000000, null) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2836L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2681L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1102L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2514L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2193L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1120L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopN_time_granularity_uses_heap_if_too_big() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators( + QueryRunnerTestHelper.INDEX_LONG_SUM, + new StringAnyAggregatorFactory("big", QueryRunnerTestHelper.PLACEMENT_DIMENSION, 40000000, null) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2836L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2681L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1102L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2514L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2193L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1120L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + private void assumeTimeOrdered() { try (final CursorHolder cursorHolder =