topn with granularity regression fixes (#17565)

* topn with granularity regression fixes

changes:
* fix issue where topN with query granularity other than ALL would use the heap algorithm when it was actual able to use the pooled algorithm, and incorrectly used the pool algorithm in cases where it must use the heap algorithm, a regression from #16533
* fix issue where topN with query granularity other than ALL could incorrectly process values in the wrong time bucket, another regression from #16533

* move defensive check outside of loop

* more test

* extra layer of safety

* move check outside of loop

* fix spelling

* add query context parameter to allow using pooled algorithm for topN when multi-passes is required even wihen query granularity is not all

* add comment, revert IT context changes and add new context flag
This commit is contained in:
Clint Wylie 2024-12-17 07:51:24 -08:00 committed by GitHub
parent 98b960c6ac
commit de9da37384
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 1212 additions and 238 deletions

View File

@ -94,7 +94,8 @@
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 60000
"timeout": 60000,
"useTopNMultiPassPooledQueryGranularity": "true"
}
},
"expectedResults": [
@ -198,7 +199,8 @@
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 60000
"timeout": 60000,
"useTopNMultiPassPooledQueryGranularity": "true"
}
},
"expectedResults": [
@ -322,7 +324,8 @@
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 60000
"timeout": 60000,
"useTopNMultiPassPooledQueryGranularity": "true"
}
},
"expectedResults": [
@ -741,7 +744,8 @@
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 60000
"timeout": 60000,
"useTopNMultiPassPooledQueryGranularity": "true"
}
},
"expectedResults": [

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.ColumnValueSelector;
@ -94,11 +95,13 @@ public class CursorGranularizer
timeSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
}
return new CursorGranularizer(cursor, bucketIterable, timeSelector, timeOrder == Order.DESCENDING);
return new CursorGranularizer(cursor, granularity, bucketIterable, timeSelector, timeOrder == Order.DESCENDING);
}
private final Cursor cursor;
private final Granularity granularity;
// Iterable that iterates over time buckets.
private final Iterable<Interval> bucketIterable;
@ -112,12 +115,14 @@ public class CursorGranularizer
private CursorGranularizer(
Cursor cursor,
Granularity granularity,
Iterable<Interval> bucketIterable,
@Nullable ColumnValueSelector timeSelector,
boolean descending
)
{
this.cursor = cursor;
this.granularity = granularity;
this.bucketIterable = bucketIterable;
this.timeSelector = timeSelector;
this.descending = descending;
@ -133,13 +138,18 @@ public class CursorGranularizer
return DateTimes.utc(currentBucketStart);
}
public Interval getCurrentInterval()
{
return Intervals.utc(currentBucketStart, currentBucketEnd);
}
public boolean advanceToBucket(final Interval bucketInterval)
{
currentBucketStart = bucketInterval.getStartMillis();
currentBucketEnd = bucketInterval.getEndMillis();
if (cursor.isDone()) {
return false;
}
currentBucketStart = bucketInterval.getStartMillis();
currentBucketEnd = bucketInterval.getEndMillis();
if (timeSelector == null) {
return true;
}

View File

@ -89,6 +89,12 @@ public class QueryContexts
public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit";
public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold";
public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled";
// this flag controls whether the topN engine can use the 'pooled' algorithm when query granularity is set to
// anything other than 'ALL' and the cardinality + number of aggregators would require more size than is available
// in the buffers and so must reset the cursor to use multiple passes. This is likely slower than the default
// behavior of falling back to heap memory, but less dangerous since too large of a query can cause the heap to run
// out of memory
public static final String TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY = "useTopNMultiPassPooledQueryGranularity";
/**
* Context parameter to enable/disable the extended filtered sum rewrite logic.
*

View File

@ -391,7 +391,7 @@ public class GroupByQueryEngine
if (delegate != null && delegate.hasNext()) {
return true;
} else {
if (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) {
if (granularizer.currentOffsetWithinBucket()) {
if (delegate != null) {
delegate.close();
}

View File

@ -113,6 +113,7 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
try {
// reset cursor since we call run again
params.getCursor().reset();
params.getGranularizer().advanceToBucket(params.getGranularizer().getCurrentInterval());
// Run topN for all metrics for top N dimension values
allMetricsParam = allMetricAlgo.makeInitParams(params.getSelectorPlus(), params.getCursor(), params.getGranularizer());
allMetricAlgo.run(

View File

@ -97,12 +97,14 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
}
boolean hasDimValSelector = (dimValSelector != null);
int cardinality = params.getCardinality();
final int cardinality = params.getCardinality();
final int numValuesPerPass = params.getNumValuesPerPass();
int numProcessed = 0;
long processedRows = 0;
while (numProcessed < cardinality) {
final int numToProcess;
int maxNumToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed);
int maxNumToProcess = Math.min(numValuesPerPass, cardinality - numProcessed);
DimValSelector theDimValSelector;
if (!hasDimValSelector) {
@ -125,6 +127,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
numProcessed += numToProcess;
if (numProcessed < cardinality) {
params.getCursor().reset();
params.getGranularizer().advanceToBucket(params.getGranularizer().getCurrentInterval());
}
}
if (queryMetrics != null) {

View File

@ -54,25 +54,27 @@ public final class Generic1AggPooledTopNScannerPrototype implements Generic1AggP
{
long processedRows = 0;
int positionToAllocate = 0;
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator.aggregate(resultsBuffer, position);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator.init(resultsBuffer, position);
aggregator.aggregate(resultsBuffer, position);
positionToAllocate += aggregatorSize;
if (granularizer.currentOffsetWithinBucket()) {
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator.aggregate(resultsBuffer, position);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator.init(resultsBuffer, position);
aggregator.aggregate(resultsBuffer, position);
positionToAllocate += aggregatorSize;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
return processedRows;

View File

@ -57,29 +57,31 @@ public final class Generic2AggPooledTopNScannerPrototype implements Generic2AggP
int totalAggregatorsSize = aggregator1Size + aggregator2Size;
long processedRows = 0;
int positionToAllocate = 0;
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator1.aggregate(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position + aggregator1Size);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator1.init(resultsBuffer, position);
aggregator1.aggregate(resultsBuffer, position);
position += aggregator1Size;
aggregator2.init(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position);
positionToAllocate += totalAggregatorsSize;
if (granularizer.currentOffsetWithinBucket()) {
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator1.aggregate(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position + aggregator1Size);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator1.init(resultsBuffer, position);
aggregator1.aggregate(resultsBuffer, position);
position += aggregator1Size;
aggregator2.init(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position);
positionToAllocate += totalAggregatorsSize;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
return processedRows;

View File

@ -477,13 +477,105 @@ public class PooledTopNAlgorithm
final int aggExtra = aggSize % AGG_UNROLL_COUNT;
int currentPosition = 0;
long processedRows = 0;
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimSelector.getRow();
if (granularizer.currentOffsetWithinBucket()) {
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimSelector.getRow();
final int dimSize = dimValues.size();
final int dimExtra = dimSize % AGG_UNROLL_COUNT;
switch (dimExtra) {
case 7:
final int dimSize = dimValues.size();
final int dimExtra = dimSize % AGG_UNROLL_COUNT;
switch (dimExtra) {
case 7:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(6),
currentPosition
);
// fall through
case 6:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(5),
currentPosition
);
// fall through
case 5:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(4),
currentPosition
);
// fall through
case 4:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(3),
currentPosition
);
// fall through
case 3:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(2),
currentPosition
);
// fall through
case 2:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(1),
currentPosition
);
// fall through
case 1:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(0),
currentPosition
);
}
for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) {
currentPosition = aggregateDimValue(
positions,
theAggregators,
@ -492,11 +584,9 @@ public class PooledTopNAlgorithm
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(6),
dimValues.get(i),
currentPosition
);
// fall through
case 6:
currentPosition = aggregateDimValue(
positions,
theAggregators,
@ -505,11 +595,9 @@ public class PooledTopNAlgorithm
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(5),
dimValues.get(i + 1),
currentPosition
);
// fall through
case 5:
currentPosition = aggregateDimValue(
positions,
theAggregators,
@ -518,11 +606,9 @@ public class PooledTopNAlgorithm
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(4),
dimValues.get(i + 2),
currentPosition
);
// fall through
case 4:
currentPosition = aggregateDimValue(
positions,
theAggregators,
@ -531,11 +617,9 @@ public class PooledTopNAlgorithm
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(3),
dimValues.get(i + 3),
currentPosition
);
// fall through
case 3:
currentPosition = aggregateDimValue(
positions,
theAggregators,
@ -544,11 +628,9 @@ public class PooledTopNAlgorithm
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(2),
dimValues.get(i + 4),
currentPosition
);
// fall through
case 2:
currentPosition = aggregateDimValue(
positions,
theAggregators,
@ -557,11 +639,9 @@ public class PooledTopNAlgorithm
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(1),
dimValues.get(i + 5),
currentPosition
);
// fall through
case 1:
currentPosition = aggregateDimValue(
positions,
theAggregators,
@ -570,103 +650,25 @@ public class PooledTopNAlgorithm
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(0),
dimValues.get(i + 6),
currentPosition
);
}
for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) {
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 1),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 2),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 3),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 4),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 5),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 6),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 7),
currentPosition
);
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 7),
currentPosition
);
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
}
return processedRows;

View File

@ -93,20 +93,22 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Ob
final DimensionSelector dimSelector = params.getDimSelector();
long processedRows = 0;
while (!cursor.isDone()) {
final Object key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));
if (granularizer.currentOffsetWithinBucket()) {
while (!cursor.isDone()) {
final Object key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));
Aggregator[] theAggregators = aggregatesStore.computeIfAbsent(
key,
k -> makeAggregators(cursor, query.getAggregatorSpecs())
);
Aggregator[] theAggregators = aggregatesStore.computeIfAbsent(
key,
k -> makeAggregators(cursor, query.getAggregatorSpecs())
);
for (Aggregator aggregator : theAggregators) {
aggregator.aggregate();
}
processedRows++;
if (!granularizer.advanceCursorWithinBucket()) {
break;
for (Aggregator aggregator : theAggregators) {
aggregator.aggregate();
}
processedRows++;
if (!granularizer.advanceCursorWithinBucket()) {
break;
}
}
}
return processedRows;

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.CursorGranularizer;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -245,6 +246,11 @@ public class TopNQueryEngine
final int numBytesPerRecord
)
{
if (cardinality < 0) {
// unknown cardinality doesn't work with the pooled algorithm which requires an exact count of dictionary ids
return false;
}
if (selector.isHasExtractionFn()) {
// extraction functions can have a many to one mapping, and should use a heap algorithm
return false;
@ -254,28 +260,35 @@ public class TopNQueryEngine
// non-string output cannot use the pooled algorith, even if the underlying selector supports it
return false;
}
if (!Types.is(capabilities, ValueType.STRING)) {
// non-strings are not eligible to use the pooled algorithm, and should use a heap algorithm
return false;
}
// string columns must use the on heap algorithm unless they have the following capabilites
if (!capabilities.isDictionaryEncoded().isTrue() || !capabilities.areDictionaryValuesUnique().isTrue()) {
// string columns must use the on heap algorithm unless they have the following capabilites
return false;
}
if (Granularities.ALL.equals(query.getGranularity())) {
// all other requirements have been satisfied, ALL granularity can always use the pooled algorithms
return true;
}
// if not using ALL granularity, we can still potentially use the pooled algorithm if we are certain it doesn't
// need to make multiple passes (e.g. reset the cursor)
// num values per pass must be greater than 0 or else the pooled algorithm cannot progress
try (final ResourceHolder<ByteBuffer> resultsBufHolder = bufferPool.take()) {
final ByteBuffer resultsBuf = resultsBufHolder.get();
final int numBytesToWorkWith = resultsBuf.capacity();
final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith / numBytesPerRecord : cardinality;
return numValuesPerPass <= cardinality;
final boolean allowMultiPassPooled = query.context().getBoolean(
QueryContexts.TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY,
false
);
if (Granularities.ALL.equals(query.getGranularity()) || allowMultiPassPooled) {
return numValuesPerPass > 0;
}
// if not using multi-pass for pooled + query granularity other than 'ALL', we must check that all values can fit
// in a single pass
return numValuesPerPass >= cardinality;
}
}

View File

@ -94,23 +94,25 @@ public abstract class NullableNumericTopNColumnAggregatesProcessor<Selector exte
)
{
long processedRows = 0;
while (!cursor.isDone()) {
if (hasNulls && selector.isNull()) {
if (nullValueAggregates == null) {
nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
if (granularizer.currentOffsetWithinBucket()) {
while (!cursor.isDone()) {
if (hasNulls && selector.isNull()) {
if (nullValueAggregates == null) {
nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
}
for (Aggregator aggregator : nullValueAggregates) {
aggregator.aggregate();
}
} else {
Aggregator[] valueAggregates = getValueAggregators(query, selector, cursor);
for (Aggregator aggregator : valueAggregates) {
aggregator.aggregate();
}
}
for (Aggregator aggregator : nullValueAggregates) {
aggregator.aggregate();
processedRows++;
if (!granularizer.advanceCursorWithinBucket()) {
break;
}
} else {
Aggregator[] valueAggregates = getValueAggregators(query, selector, cursor);
for (Aggregator aggregator : valueAggregates) {
aggregator.aggregate();
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucket()) {
break;
}
}
return processedRows;

View File

@ -150,27 +150,29 @@ public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregates
)
{
long processedRows = 0;
while (!cursor.isDone()) {
final IndexedInts dimValues = selector.getRow();
for (int i = 0, size = dimValues.size(); i < size; ++i) {
final int dimIndex = dimValues.get(i);
Aggregator[] aggs = rowSelector[dimIndex];
if (aggs == null) {
final Object key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
aggs = aggregatesStore.computeIfAbsent(
key,
k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
);
rowSelector[dimIndex] = aggs;
}
if (granularizer.currentOffsetWithinBucket()) {
while (!cursor.isDone()) {
final IndexedInts dimValues = selector.getRow();
for (int i = 0, size = dimValues.size(); i < size; ++i) {
final int dimIndex = dimValues.get(i);
Aggregator[] aggs = rowSelector[dimIndex];
if (aggs == null) {
final Object key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
aggs = aggregatesStore.computeIfAbsent(
key,
k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
);
rowSelector[dimIndex] = aggs;
}
for (Aggregator aggregator : aggs) {
aggregator.aggregate();
for (Aggregator aggregator : aggs) {
aggregator.aggregate();
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucket()) {
break;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucket()) {
break;
}
}
return processedRows;
@ -192,22 +194,24 @@ public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregates
)
{
long processedRows = 0;
while (!cursor.isDone()) {
final IndexedInts dimValues = selector.getRow();
for (int i = 0, size = dimValues.size(); i < size; ++i) {
final int dimIndex = dimValues.get(i);
final Object key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
Aggregator[] aggs = aggregatesStore.computeIfAbsent(
key,
k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
);
for (Aggregator aggregator : aggs) {
aggregator.aggregate();
if (granularizer.currentOffsetWithinBucket()) {
while (!cursor.isDone()) {
final IndexedInts dimValues = selector.getRow();
for (int i = 0, size = dimValues.size(); i < size; ++i) {
final int dimIndex = dimValues.get(i);
final Object key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
Aggregator[] aggs = aggregatesStore.computeIfAbsent(
key,
k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
);
for (Aggregator aggregator : aggs) {
aggregator.aggregate();
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucket()) {
break;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucket()) {
break;
}
}
return processedRows;

View File

@ -60,6 +60,7 @@ import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.FloatMaxAggregatorFactory;
import org.apache.druid.query.aggregation.FloatMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory;
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import org.apache.druid.query.aggregation.firstlast.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.aggregation.firstlast.first.FloatFirstAggregatorFactory;
@ -6378,6 +6379,928 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
assertExpectedResults(expectedResults, query);
}
@Test
public void testTopN_time_granularity_empty_buckets()
{
assumeTimeOrdered();
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(Granularities.HOUR)
.dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
.metric(QueryRunnerTestHelper.INDEX_METRIC)
.threshold(10_000)
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.aggregators(QueryRunnerTestHelper.INDEX_LONG_SUM)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
"index", 2836L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
"index", 2681L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
"index", 1102L
)
)
)
),
new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(
DateTimes.of("2011-04-02T00:00:00.000Z"),
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
"index", 2514L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
"index", 2193L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
"index", 1120L
)
)
)
),
new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList()))
);
assertExpectedResults(expectedResults, query);
}
@Test
public void testTopN_time_granularity_empty_buckets_expression()
{
assumeTimeOrdered();
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(Granularities.HOUR)
.virtualColumns(
new ExpressionVirtualColumn(
"vc",
"market + ' ' + placement",
ColumnType.STRING,
TestExprMacroTable.INSTANCE
)
)
.dimension("vc")
.metric(QueryRunnerTestHelper.INDEX_METRIC)
.threshold(10_000)
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.aggregators(QueryRunnerTestHelper.INDEX_LONG_SUM)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
"vc", "total_market preferred",
"index", 2836L
),
ImmutableMap.of(
"vc", "upfront preferred",
"index", 2681L
),
ImmutableMap.of(
"vc", "spot preferred",
"index", 1102L
)
)
)
),
new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(
DateTimes.of("2011-04-02T00:00:00.000Z"),
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
"vc", "total_market preferred",
"index", 2514L
),
ImmutableMap.of(
"vc", "upfront preferred",
"index", 2193L
),
ImmutableMap.of(
"vc", "spot preferred",
"index", 1120L
)
)
)
),
new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList()))
);
assertExpectedResults(expectedResults, query);
}
@Test
public void testTopN_time_granularity_empty_buckets_2pool()
{
assumeTimeOrdered();
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(Granularities.HOUR)
.dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
.metric(QueryRunnerTestHelper.INDEX_METRIC)
.threshold(10_000)
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.aggregators(
QueryRunnerTestHelper.INDEX_LONG_SUM,
QueryRunnerTestHelper.INDEX_DOUBLE_MAX
)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
"index", 2836L,
"doubleMaxIndex", 1522.043733
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
"index", 2681L,
"doubleMaxIndex", 1447.34116
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
"index", 1102L,
"doubleMaxIndex", 158.747224
)
)
)
),
new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(
DateTimes.of("2011-04-02T00:00:00.000Z"),
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
"index", 2514L,
"doubleMaxIndex", 1321.375057
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
"index", 2193L,
"doubleMaxIndex", 1144.342401
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
"index", 1120L,
"doubleMaxIndex", 166.016049
)
)
)
),
new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList()))
);
assertExpectedResults(expectedResults, query);
}
@Test
public void testTopN_time_granularity_empty_buckets_timeExtract()
{
// this is pretty wierd to have both query granularity and a time extractionFn... but it is not explicitly
// forbidden so might as well test it
assumeTimeOrdered();
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(Granularities.HOUR)
.dimension(
new ExtractionDimensionSpec(
ColumnHolder.TIME_COLUMN_NAME,
"dayOfWeek",
new TimeFormatExtractionFn("EEEE", null, null, null, false)
)
)
.metric(QueryRunnerTestHelper.INDEX_METRIC)
.threshold(10_000)
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.aggregators(
QueryRunnerTestHelper.INDEX_LONG_SUM
)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
TopNResultValue.create(
Collections.singletonList(
ImmutableMap.of(
"dayOfWeek", "Friday",
"index", 6619L
)
)
)
),
new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(
DateTimes.of("2011-04-02T00:00:00.000Z"),
TopNResultValue.create(
Collections.singletonList(
ImmutableMap.of(
"dayOfWeek", "Saturday",
"index", 5827L
)
)
)
),
new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList()))
);
assertExpectedResults(expectedResults, query);
}
@Test
public void testTopN_time_granularity_empty_buckets_numeric()
{
assumeTimeOrdered();
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(Granularities.HOUR)
.dimension(new DefaultDimensionSpec("qualityLong", "qualityLong", ColumnType.LONG))
.metric(QueryRunnerTestHelper.INDEX_METRIC)
.threshold(10_000)
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.aggregators(
QueryRunnerTestHelper.INDEX_LONG_SUM
)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
TopNResultValue.create(
Arrays.asList(
ImmutableMap.of(
"qualityLong", 1600L,
"index", 2900L
),
ImmutableMap.of(
"qualityLong", 1400L,
"index", 2870L
),
ImmutableMap.of(
"qualityLong", 1200L,
"index", 158L
),
ImmutableMap.of(
"qualityLong", 1000L,
"index", 135L
),
ImmutableMap.of(
"qualityLong", 1500L,
"index", 121L
),
ImmutableMap.of(
"qualityLong", 1300L,
"index", 120L
),
ImmutableMap.of(
"qualityLong", 1800L,
"index", 119L
),
ImmutableMap.of(
"qualityLong", 1100L,
"index", 118L
),
ImmutableMap.of(
"qualityLong", 1700L,
"index", 78L
)
)
)
),
new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(
DateTimes.of("2011-04-02T00:00:00.000Z"),
TopNResultValue.create(
Arrays.asList(
ImmutableMap.of(
"qualityLong", 1600L,
"index", 2505L
),
ImmutableMap.of(
"qualityLong", 1400L,
"index", 2447L
),
ImmutableMap.of(
"qualityLong", 1200L,
"index", 166L
),
ImmutableMap.of(
"qualityLong", 1000L,
"index", 147L
),
ImmutableMap.of(
"qualityLong", 1800L,
"index", 126L
),
ImmutableMap.of(
"qualityLong", 1500L,
"index", 114L
),
ImmutableMap.of(
"qualityLong", 1300L,
"index", 113L
),
ImmutableMap.of(
"qualityLong", 1100L,
"index", 112L
),
ImmutableMap.of(
"qualityLong", 1700L,
"index", 97L
)
)
)
),
new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList()))
);
assertExpectedResults(expectedResults, query);
}
@Test
public void testTopN_time_granularity_multipass_no_pool()
{
assumeTimeOrdered();
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(Granularities.HOUR)
.dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
.metric(QueryRunnerTestHelper.INDEX_METRIC)
.threshold(10_000)
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.aggregators(
QueryRunnerTestHelper.INDEX_LONG_SUM,
new StringAnyAggregatorFactory("big", QueryRunnerTestHelper.PLACEMENT_DIMENSION, 4000000, null)
)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
"big", "preferred",
"index", 2836L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
"big", "preferred",
"index", 2681L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
"big", "preferred",
"index", 1102L
)
)
)
),
new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(
DateTimes.of("2011-04-02T00:00:00.000Z"),
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
"big", "preferred",
"index", 2514L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
"big", "preferred",
"index", 2193L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
"big", "preferred",
"index", 1120L
)
)
)
),
new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList()))
);
assertExpectedResults(expectedResults, query);
}
@Test
public void testTopN_time_granularity_multipass_with_pooled()
{
assumeTimeOrdered();
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(Granularities.HOUR)
.dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
.metric(QueryRunnerTestHelper.INDEX_METRIC)
.threshold(10_000)
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.context(ImmutableMap.of(QueryContexts.TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY, true))
.aggregators(
QueryRunnerTestHelper.INDEX_LONG_SUM,
new StringAnyAggregatorFactory("big", QueryRunnerTestHelper.PLACEMENT_DIMENSION, 4000000, null)
)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
"big", "preferred",
"index", 2836L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
"big", "preferred",
"index", 2681L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
"big", "preferred",
"index", 1102L
)
)
)
),
new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(
DateTimes.of("2011-04-02T00:00:00.000Z"),
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
"big", "preferred",
"index", 2514L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
"big", "preferred",
"index", 2193L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
"big", "preferred",
"index", 1120L
)
)
)
),
new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList()))
);
assertExpectedResults(expectedResults, query);
}
@Test
public void testTopN_time_granularity_uses_heap_if_too_big()
{
assumeTimeOrdered();
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(Granularities.HOUR)
.dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
.metric(QueryRunnerTestHelper.INDEX_METRIC)
.threshold(10_000)
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.aggregators(
QueryRunnerTestHelper.INDEX_LONG_SUM,
new StringAnyAggregatorFactory("big", QueryRunnerTestHelper.PLACEMENT_DIMENSION, 40000000, null)
)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
"big", "preferred",
"index", 2836L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
"big", "preferred",
"index", 2681L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
"big", "preferred",
"index", 1102L
)
)
)
),
new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(
DateTimes.of("2011-04-02T00:00:00.000Z"),
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
"big", "preferred",
"index", 2514L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
"big", "preferred",
"index", 2193L
),
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
"big", "preferred",
"index", 1120L
)
)
)
),
new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())),
new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList()))
);
assertExpectedResults(expectedResults, query);
}
private void assumeTimeOrdered()
{
try (final CursorHolder cursorHolder =