Fix topN lexicographic sort (#5815)

* fixes #5814
changes:
* pass `StorageAdapter` to topn algorithms to get things like if column is 'sorted' or if query interval is smaller than segment granularity, instead of using `io.druid.segment.Capabilities`
* remove `io.druid.segment.Capabilities` since it had one purpose, supplying `dimensionValuesSorted` which is now provided directly by `StorageAdapter`.
* added test for topn optimization path checking

* add Capabilities back since StorageAdapter is marked PublicApi

* oops

* add javadoc, fix build i think

* correctly revert api changes

* fix intellij fail

* fix typo :(
This commit is contained in:
Clint Wylie 2018-05-31 09:53:29 -07:00 committed by Charles Allen
parent 50ad7a45ff
commit 2b45a6a42d
17 changed files with 577 additions and 72 deletions

View File

@ -26,8 +26,8 @@ import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.StorageAdapter;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@ -39,17 +39,17 @@ import java.util.List;
*/
public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], TopNParams>
{
private final Capabilities capabilities;
private final StorageAdapter storageAdapter;
private final TopNQuery query;
private final NonBlockingPool<ByteBuffer> bufferPool;
public AggregateTopNMetricFirstAlgorithm(
Capabilities capabilities,
StorageAdapter storageAdapter,
TopNQuery query,
NonBlockingPool<ByteBuffer> bufferPool
)
{
this.capabilities = capabilities;
this.storageAdapter = storageAdapter;
this.query = query;
this.bufferPool = bufferPool;
}
@ -91,7 +91,7 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
.build();
final TopNResultBuilder singleMetricResultBuilder = BaseTopNAlgorithm.makeResultBuilder(params, singleMetricQuery);
PooledTopNAlgorithm singleMetricAlgo = new PooledTopNAlgorithm(capabilities, singleMetricQuery, bufferPool);
PooledTopNAlgorithm singleMetricAlgo = new PooledTopNAlgorithm(storageAdapter, singleMetricQuery, bufferPool);
PooledTopNAlgorithm.PooledTopNParams singleMetricParam = null;
int[] dimValSelector = null;
try {
@ -110,7 +110,7 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
singleMetricAlgo.cleanup(singleMetricParam);
}
PooledTopNAlgorithm allMetricAlgo = new PooledTopNAlgorithm(capabilities, query, bufferPool);
PooledTopNAlgorithm allMetricAlgo = new PooledTopNAlgorithm(storageAdapter, query, bufferPool);
PooledTopNAlgorithm.PooledTopNParams allMetricsParam = null;
try {
// Run topN for all metrics for top N dimension values

View File

@ -19,6 +19,7 @@
package io.druid.query.topn;
import com.google.common.annotations.VisibleForTesting;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Pair;
import io.druid.query.aggregation.Aggregator;
@ -29,6 +30,7 @@ import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IdLookup;
import io.druid.segment.StorageAdapter;
import javax.annotation.Nullable;
import java.util.Arrays;
@ -62,11 +64,11 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
return aggregators;
}
protected final Capabilities capabilities;
protected final StorageAdapter storageAdapter;
protected BaseTopNAlgorithm(Capabilities capabilities)
protected BaseTopNAlgorithm(StorageAdapter storageAdapter)
{
this.capabilities = capabilities;
this.storageAdapter = storageAdapter;
}
@Override
@ -134,6 +136,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
* This function currently handles TopNs on long and float columns, which do not provide cardinality or an ID lookup.
* When cardinality is unknown, process everything in one pass.
* Existing implementations of makeDimValSelector() require cardinality as well, so the DimValSelector is not used.
*
* @param params TopN parameters from run()
* @param resultBuilder Result builder from run()
*/
@ -206,9 +209,14 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
Aggregator[][] expansionAggs;
int cardinality;
public AggregatorArrayProvider(DimensionSelector dimSelector, TopNQuery query, int cardinality, Capabilities capabilities)
public AggregatorArrayProvider(
DimensionSelector dimSelector,
TopNQuery query,
int cardinality,
StorageAdapter storageAdapter
)
{
super(dimSelector, query, capabilities);
super(dimSelector, query, storageAdapter);
this.expansionAggs = new Aggregator[cardinality][];
this.cardinality = cardinality;
@ -236,17 +244,17 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
private final IdLookup idLookup;
private final TopNQuery query;
private final Capabilities capabilities;
private final StorageAdapter storageAdapter;
public BaseArrayProvider(
DimensionSelector dimSelector,
TopNQuery query,
Capabilities capabilities
StorageAdapter storageAdapter
)
{
this.idLookup = dimSelector.idLookup();
this.query = query;
this.capabilities = capabilities;
this.storageAdapter = storageAdapter;
previousStop = null;
ignoreAfterThreshold = false;
@ -261,6 +269,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
@Override
public void skipTo(String previousStop)
{
Capabilities capabilities = storageAdapter.getCapabilities();
if (capabilities.dimensionValuesSorted()) {
this.previousStop = previousStop;
}
@ -284,7 +293,8 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
keepOnlyN = n;
}
protected Pair<Integer, Integer> computeStartEnd(int cardinality)
@VisibleForTesting
public Pair<Integer, Integer> computeStartEnd(int cardinality)
{
int startIndex = ignoreFirstN;
@ -305,7 +315,9 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
int endIndex = Math.min(ignoreFirstN + keepOnlyN, cardinality);
if (ignoreAfterThreshold && query.getDimensionsFilter() == null) {
if (ignoreAfterThreshold &&
query.getDimensionsFilter() == null &&
query.getIntervals().stream().anyMatch(interval -> interval.contains(storageAdapter.getInterval()))) {
endIndex = Math.min(endIndex, startIndex + query.getThreshold());
}

View File

@ -23,24 +23,25 @@ import com.google.common.base.Function;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.topn.types.TopNColumnSelectorStrategy;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.StorageAdapter;
import java.util.Map;
/**
* This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle multiple index numerals referencing the same dimension value.
*/
public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][], Map<Comparable, Aggregator[]>, TopNParams>
public class DimExtractionTopNAlgorithm
extends BaseTopNAlgorithm<Aggregator[][], Map<Comparable, Aggregator[]>, TopNParams>
{
private final TopNQuery query;
public DimExtractionTopNAlgorithm(
Capabilities capabilities,
StorageAdapter storageAdapter,
TopNQuery query
)
{
super(capabilities);
super(storageAdapter);
this.query = query;
}
@ -65,7 +66,7 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
}
ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
return selectorPlus.getColumnSelectorStrategy().getDimExtractionRowSelector(query, params, capabilities);
return selectorPlus.getColumnSelectorStrategy().getDimExtractionRowSelector(query, params, storageAdapter);
}
@Override

View File

@ -33,10 +33,10 @@ import io.druid.query.aggregation.SimpleDoubleBufferAggregator;
import io.druid.query.monomorphicprocessing.SpecializationService;
import io.druid.query.monomorphicprocessing.SpecializationState;
import io.druid.query.monomorphicprocessing.StringRuntimeShape;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FilteredOffset;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.Offset;
@ -64,7 +64,9 @@ public class PooledTopNAlgorithm
private static boolean specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN =
!Boolean.getBoolean("dontSpecializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN");
/** See TopNQueryRunnerTest */
/**
* See TopNQueryRunnerTest
*/
@VisibleForTesting
static void setSpecializeGeneric1AggPooledTopN(boolean value)
{
@ -116,6 +118,7 @@ public class PooledTopNAlgorithm
}
private static final List<ScanAndAggregate> specializedScanAndAggregateImplementations = new ArrayList<>();
static {
computeSpecializedScanAndAggregateImplementations();
}
@ -197,12 +200,12 @@ public class PooledTopNAlgorithm
private static final int AGG_UNROLL_COUNT = 8; // Must be able to fit loop below
public PooledTopNAlgorithm(
Capabilities capabilities,
StorageAdapter storageAdapter,
TopNQuery query,
NonBlockingPool<ByteBuffer> bufferPool
)
{
super(capabilities);
super(storageAdapter);
this.query = query;
this.bufferPool = bufferPool;
}
@ -226,7 +229,7 @@ public class PooledTopNAlgorithm
final TopNMetricSpecBuilder<int[]> arrayProvider = new BaseArrayProvider<int[]>(
dimSelector,
query,
capabilities
storageAdapter
)
{
private final int[] positions = new int[cardinality];

View File

@ -20,11 +20,11 @@
package io.druid.query.topn;
import com.google.common.collect.Maps;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.ColumnSelectorPlus;
import io.druid.segment.Capabilities;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.StorageAdapter;
import java.util.Map;
@ -33,9 +33,9 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<St
public static final int[] EMPTY_INTS = new int[]{};
private final TopNQuery query;
public TimeExtractionTopNAlgorithm(Capabilities capabilities, TopNQuery query)
public TimeExtractionTopNAlgorithm(StorageAdapter storageAdapter, TopNQuery query)
{
super(capabilities);
super(storageAdapter);
this.query = query;
}

View File

@ -30,7 +30,6 @@ import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.Filter;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.SegmentMissingException;
import io.druid.segment.StorageAdapter;
@ -109,7 +108,6 @@ public class TopNQueryEngine
final @Nullable TopNQueryMetrics queryMetrics
)
{
final Capabilities capabilities = adapter.getCapabilities();
final String dimension = query.getDimensionSpec().getDimension();
final int cardinality = adapter.getDimensionCardinality(dimension);
if (queryMetrics != null) {
@ -137,19 +135,19 @@ public class TopNQueryEngine
) {
// A special TimeExtractionTopNAlgorithm is required, since DimExtractionTopNAlgorithm
// currently relies on the dimension cardinality to support lexicographic sorting
topNAlgorithm = new TimeExtractionTopNAlgorithm(capabilities, query);
topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
} else if (selector.isHasExtractionFn()) {
topNAlgorithm = new DimExtractionTopNAlgorithm(capabilities, query);
topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
} else if (columnCapabilities != null && !(columnCapabilities.getType() == ValueType.STRING
&& columnCapabilities.isDictionaryEncoded())) {
// Use DimExtraction for non-Strings and for non-dictionary-encoded Strings.
topNAlgorithm = new DimExtractionTopNAlgorithm(capabilities, query);
topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
} else if (selector.isAggregateAllMetrics()) {
topNAlgorithm = new PooledTopNAlgorithm(capabilities, query, bufferPool);
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
} else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(capabilities, query, bufferPool);
topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool);
} else {
topNAlgorithm = new PooledTopNAlgorithm(capabilities, query, bufferPool);
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
}
if (queryMetrics != null) {
queryMetrics.algorithm(topNAlgorithm);
@ -162,7 +160,9 @@ public class TopNQueryEngine
{
return query.getDimensionSpec() != null
&& query.getDimensionSpec().getExtractionFn() != null
&& ExtractionFn.ExtractionType.ONE_TO_ONE.equals(query.getDimensionSpec().getExtractionFn().getExtractionType())
&& ExtractionFn.ExtractionType.ONE_TO_ONE.equals(query.getDimensionSpec()
.getExtractionFn()
.getExtractionType())
&& query.getTopNMetricSpec().canBeOptimizedUnordered();
}
}

View File

@ -28,8 +28,8 @@ import io.druid.query.topn.TopNResultBuilder;
import io.druid.segment.BaseDoubleColumnValueSelector;
import io.druid.segment.BaseFloatColumnValueSelector;
import io.druid.segment.BaseLongColumnValueSelector;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.ValueType;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@ -51,7 +51,7 @@ public abstract class NumericTopNColumnSelectorStrategy<
@Override
public Aggregator[][] getDimExtractionRowSelector(
TopNQuery query, TopNParams params, Capabilities capabilities
TopNQuery query, TopNParams params, StorageAdapter storageAdapter
)
{
return null;

View File

@ -26,9 +26,9 @@ import io.druid.query.topn.BaseTopNAlgorithm;
import io.druid.query.topn.TopNParams;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNResultBuilder;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
@ -50,7 +50,7 @@ public class StringTopNColumnSelectorStrategy
}
@Override
public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, Capabilities capabilities)
public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
{
if (params.getCardinality() < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
@ -64,7 +64,7 @@ public class StringTopNColumnSelectorStrategy
(DimensionSelector) params.getSelectorPlus().getSelector(),
query,
params.getCardinality(),
capabilities
storageAdapter
);
return provider.build();

View File

@ -25,8 +25,8 @@ import io.druid.query.dimension.ColumnSelectorStrategy;
import io.druid.query.topn.TopNParams;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNResultBuilder;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.ValueType;
import javax.annotation.Nullable;
@ -53,10 +53,12 @@ public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggr
*
* @param query The TopN query being served
* @param params Parameters for the TopN query being served
* @param capabilities Object indicating if dimension values are sorted
* @param storageAdapter Column storage adapter, to provide information about the column that can be used for
* query optimization, e.g. whether dimension values are sorted or not
*
* @return an Aggregator[][] for integer-valued dimensions, null otherwise
*/
Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, Capabilities capabilities);
Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter);
/**
* Used by DimExtractionTopNAlgorithm.
@ -88,6 +90,7 @@ public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggr
* @param cursor Cursor for the segment being queried
* @param rowSelector Integer lookup containing aggregators
* @param aggregatesStore Map containing aggregators
*
* @return the number of processed rows (after postFilters are applied inside the cursor being processed)
*/
long dimExtractionScanAndAggregate(

View File

@ -21,6 +21,7 @@ package io.druid.segment;
/**
*/
public class Capabilities
{
private final boolean dimensionValuesSorted;
@ -37,6 +38,10 @@ public class Capabilities
this.dimensionValuesSorted = dimensionValuesSorted;
}
/**
* Is dimension value dictionary sorted?
* @return
*/
public boolean dimensionValuesSorted()
{
return dimensionValuesSorted;

View File

@ -192,7 +192,7 @@ public final class DimensionHandlerUtils
}
}
// When determining the capabilites of a column during query processing, this function
// When determining the capabilities of a column during query processing, this function
// adjusts the capabilities for columns that cannot be handled as-is to manageable defaults
// (e.g., treating missing columns as empty String columns)
private static ColumnCapabilities getEffectiveCapabilities(

View File

@ -146,6 +146,6 @@ class QueryableIndexColumnSelectorFactory implements ColumnSelectorFactory
return virtualColumns.getColumnCapabilities(columnName);
}
return QueryableIndexStorageAdapter.getColumnCapabilites(index, columnName);
return QueryableIndexStorageAdapter.getColumnCapabilities(index, columnName);
}
}

View File

@ -167,7 +167,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Nullable
public ColumnCapabilities getColumnCapabilities(String column)
{
return getColumnCapabilites(index, column);
return getColumnCapabilities(index, column);
}
@Override
@ -316,7 +316,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
@Nullable
static ColumnCapabilities getColumnCapabilites(ColumnSelector index, String columnName)
static ColumnCapabilities getColumnCapabilities(ColumnSelector index, String columnName)
{
Column columnObj = index.getColumn(columnName);
if (columnObj == null) {

View File

@ -299,7 +299,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
for (DimensionSchema dimSchema : dimensionsSpec.getDimensions()) {
ValueType type = TYPE_MAP.get(dimSchema.getValueType());
String dimName = dimSchema.getName();
ColumnCapabilitiesImpl capabilities = makeCapabilitesFromValueType(type);
ColumnCapabilitiesImpl capabilities = makeCapabilitiesFromValueType(type);
capabilities.setHasBitmapIndexes(dimSchema.hasBitmapIndex());
if (dimSchema.getTypeName().equals(DimensionSchema.SPATIAL_TYPE_NAME)) {
@ -875,7 +875,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
}
}
private ColumnCapabilitiesImpl makeCapabilitesFromValueType(ValueType type)
private ColumnCapabilitiesImpl makeCapabilitiesFromValueType(ValueType type)
{
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setDictionaryEncoded(type == ValueType.STRING);

View File

@ -142,6 +142,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return indexer.getMaxValue();
}
@Override
public Capabilities getCapabilities()
{

View File

@ -20,7 +20,7 @@
package io.druid.query.topn;
import io.druid.collections.ResourceHolder;
import io.druid.segment.Capabilities;
import io.druid.segment.StorageAdapter;
import org.easymock.EasyMock;
import org.junit.Test;
@ -31,14 +31,14 @@ public class PooledTopNAlgorithmTest
@Test
public void testCleanupWithNullParams()
{
PooledTopNAlgorithm pooledTopNAlgorithm = new PooledTopNAlgorithm(Capabilities.builder().build(), null, null);
PooledTopNAlgorithm pooledTopNAlgorithm = new PooledTopNAlgorithm(EasyMock.mock(StorageAdapter.class), null, null);
pooledTopNAlgorithm.cleanup(null);
}
@Test
public void cleanup()
{
PooledTopNAlgorithm pooledTopNAlgorithm = new PooledTopNAlgorithm(Capabilities.builder().build(), null, null);
PooledTopNAlgorithm pooledTopNAlgorithm = new PooledTopNAlgorithm(EasyMock.mock(StorageAdapter.class), null, null);
PooledTopNAlgorithm.PooledTopNParams params = EasyMock.createMock(PooledTopNAlgorithm.PooledTopNParams.class);
ResourceHolder<ByteBuffer> resourceHolder = EasyMock.createMock(ResourceHolder.class);
EasyMock.expect(params.getResultsBufHolder()).andReturn(resourceHolder).times(1);

View File

@ -0,0 +1,480 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.QueryMetrics;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IdLookup;
import io.druid.segment.Metadata;
import io.druid.segment.StorageAdapter;
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Arrays;
import static io.druid.query.QueryRunnerTestHelper.addRowsIndexConstant;
import static io.druid.query.QueryRunnerTestHelper.allGran;
import static io.druid.query.QueryRunnerTestHelper.commonDoubleAggregators;
import static io.druid.query.QueryRunnerTestHelper.dataSource;
import static io.druid.query.QueryRunnerTestHelper.indexMetric;
import static io.druid.query.QueryRunnerTestHelper.marketDimension;
import static io.druid.query.QueryRunnerTestHelper.qualityDimension;
public class TopNMetricSpecOptimizationsTest
{
@Test
public void testShouldOptimizeLexicographic()
{
// query interval is greater than segment interval, no filters, can ignoreAfterThreshold
int cardinality = 1234;
int threshold = 4;
TopNQuery query = new TopNQueryBuilder()
.dataSource(dataSource)
.granularity(allGran)
.dimension(marketDimension)
.metric(indexMetric)
.threshold(threshold)
.intervals("2018-05-30T00:00:00Z/2018-05-31T00:00:00Z")
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
.build();
StorageAdapter adapter =
makeFakeStorageAdapter("2018-05-30T00:00:00Z", "2018-05-30T01:00:00Z", cardinality);
DimensionSelector dimSelector = makeFakeDimSelector(cardinality);
BaseTopNAlgorithm.AggregatorArrayProvider arrayProviderToTest = new BaseTopNAlgorithm.AggregatorArrayProvider(
dimSelector,
query,
cardinality,
adapter
);
arrayProviderToTest.ignoreAfterThreshold();
Pair<Integer, Integer> thePair = arrayProviderToTest.computeStartEnd(cardinality);
Assert.assertEquals(new Integer(0), thePair.lhs);
Assert.assertEquals(new Integer(threshold), thePair.rhs);
}
@Test
public void testAlsoShouldOptimizeLexicographic()
{
// query interval is same as segment interval, no filters, can ignoreAfterThreshold
int cardinality = 1234;
int threshold = 4;
TopNQuery query = new TopNQueryBuilder()
.dataSource(dataSource)
.granularity(allGran)
.dimension(marketDimension)
.metric(indexMetric)
.threshold(threshold)
.intervals("2018-05-30T00:00:00Z/2018-05-30T01:00:00Z")
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
.build();
StorageAdapter adapter =
makeFakeStorageAdapter("2018-05-30T00:00:00Z", "2018-05-30T01:00:00Z", cardinality);
DimensionSelector dimSelector = makeFakeDimSelector(cardinality);
BaseTopNAlgorithm.AggregatorArrayProvider arrayProviderToTest = new BaseTopNAlgorithm.AggregatorArrayProvider(
dimSelector,
query,
cardinality,
adapter
);
arrayProviderToTest.ignoreAfterThreshold();
Pair<Integer, Integer> thePair = arrayProviderToTest.computeStartEnd(cardinality);
Assert.assertEquals(new Integer(0), thePair.lhs);
Assert.assertEquals(new Integer(threshold), thePair.rhs);
}
@Test
public void testShouldNotOptimizeLexicographic()
{
// query interval is smaller than segment interval, no filters, can ignoreAfterThreshold
int cardinality = 1234;
int threshold = 4;
TopNQuery query = new TopNQueryBuilder()
.dataSource(dataSource)
.granularity(allGran)
.dimension(marketDimension)
.metric(indexMetric)
.threshold(threshold)
.intervals("2018-05-30T00:00:00Z/2018-05-30T01:00:00Z")
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
.build();
StorageAdapter adapter =
makeFakeStorageAdapter("2018-05-30T00:00:00Z", "2018-05-31T00:00:00Z", cardinality);
DimensionSelector dimSelector = makeFakeDimSelector(cardinality);
BaseTopNAlgorithm.AggregatorArrayProvider arrayProviderToTest = new BaseTopNAlgorithm.AggregatorArrayProvider(
dimSelector,
query,
cardinality,
adapter
);
arrayProviderToTest.ignoreAfterThreshold();
Pair<Integer, Integer> thePair = arrayProviderToTest.computeStartEnd(cardinality);
Assert.assertEquals(new Integer(0), thePair.lhs);
Assert.assertEquals(new Integer(cardinality), thePair.rhs);
}
@Test
public void testAlsoShouldNotOptimizeLexicographic()
{
// query interval is larger than segment interval, but has filters, can ignoreAfterThreshold
int cardinality = 1234;
int threshold = 4;
TopNQuery query = new TopNQueryBuilder()
.dataSource(dataSource)
.granularity(allGran)
.dimension(marketDimension)
.filters(qualityDimension, "entertainment")
.metric(indexMetric)
.threshold(threshold)
.intervals("2018-05-30T00:00:00Z/2018-05-31T00:00:00Z")
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
.build();
StorageAdapter adapter =
makeFakeStorageAdapter("2018-05-30T00:00:00Z", "2018-05-30T01:00:00Z", cardinality);
DimensionSelector dimSelector = makeFakeDimSelector(cardinality);
BaseTopNAlgorithm.AggregatorArrayProvider arrayProviderToTest = new BaseTopNAlgorithm.AggregatorArrayProvider(
dimSelector,
query,
cardinality,
adapter
);
arrayProviderToTest.ignoreAfterThreshold();
Pair<Integer, Integer> thePair = arrayProviderToTest.computeStartEnd(cardinality);
Assert.assertEquals(new Integer(0), thePair.lhs);
Assert.assertEquals(new Integer(cardinality), thePair.rhs);
}
@Test
public void testAgainShouldNotOptimizeLexicographic()
{
// query interval is larger than segment interval, no filters, can NOT ignoreAfterThreshold
int cardinality = 1234;
int threshold = 4;
TopNQuery query = new TopNQueryBuilder()
.dataSource(dataSource)
.granularity(allGran)
.dimension(marketDimension)
.metric(indexMetric)
.threshold(threshold)
.intervals("2018-05-30T00:00:00Z/2018-05-31T00:00:00Z")
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
.build();
StorageAdapter adapter =
makeFakeStorageAdapter("2018-05-30T00:00:00Z", "2018-05-30T01:00:00Z", cardinality);
DimensionSelector dimSelector = makeFakeDimSelector(cardinality);
BaseTopNAlgorithm.AggregatorArrayProvider arrayProviderToTest = new BaseTopNAlgorithm.AggregatorArrayProvider(
dimSelector,
query,
cardinality,
adapter
);
Pair<Integer, Integer> thePair = arrayProviderToTest.computeStartEnd(cardinality);
Assert.assertEquals(new Integer(0), thePair.lhs);
Assert.assertEquals(new Integer(cardinality), thePair.rhs);
}
private StorageAdapter makeFakeStorageAdapter(String start, String end, int cardinality)
{
StorageAdapter adapter = new StorageAdapter()
{
@Override
public Interval getInterval()
{
return Intervals.of(start + "/" + end);
}
@Override
public int getDimensionCardinality(String column)
{
return cardinality;
}
@Override
public DateTime getMinTime()
{
return DateTimes.of(start);
}
@Override
public DateTime getMaxTime()
{
return DateTimes.of(end);
}
// stubs below this line not important for tests
@Override
public String getSegmentIdentifier()
{
return null;
}
@Override
public Indexed<String> getAvailableDimensions()
{
return null;
}
@Override
public Iterable<String> getAvailableMetrics()
{
return null;
}
@Nullable
@Override
public Comparable getMinValue(String column)
{
return null;
}
@Nullable
@Override
public Comparable getMaxValue(String column)
{
return null;
}
@Override
public Capabilities getCapabilities()
{
return Capabilities.builder().dimensionValuesSorted(true).build();
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return null;
}
@Nullable
@Override
public String getColumnTypeName(String column)
{
return null;
}
@Override
public int getNumRows()
{
return 0;
}
@Override
public DateTime getMaxIngestedEventTime()
{
return null;
}
@Override
public Metadata getMetadata()
{
return null;
}
@Override
public Sequence<Cursor> makeCursors(
@Nullable Filter filter,
Interval interval,
VirtualColumns virtualColumns,
Granularity gran,
boolean descending,
@Nullable QueryMetrics<?> queryMetrics
)
{
return null;
}
};
return adapter;
}
private DimensionSelector makeFakeDimSelector(int cardinality)
{
DimensionSelector dimSelector = new DimensionSelector()
{
@Override
public int getValueCardinality()
{
return cardinality;
}
// stubs below this line not important for tests
@Override
public IndexedInts getRow()
{
return null;
}
@Override
public ValueMatcher makeValueMatcher(@Nullable String value)
{
return null;
}
@Override
public ValueMatcher makeValueMatcher(Predicate<String> predicate)
{
return null;
}
@Nullable
@Override
public String lookupName(int id)
{
return null;
}
@Override
public boolean nameLookupPossibleInAdvance()
{
return false;
}
@Nullable
@Override
public IdLookup idLookup()
{
return null;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
@Nullable
@Override
public Object getObject()
{
return null;
}
@Override
public Class classOfObject()
{
return null;
}
};
return dimSelector;
}
}