fix topn aggregation on numeric columns with null values (#9183)

* fix topn issue with aggregating on numeric columns with null values

* adjustments

* rename

* add more tests

* fix comments

* more javadocs

* computeIfAbsent
This commit is contained in:
Clint Wylie 2020-01-17 18:12:24 -08:00 committed by Jihoon Son
parent 153495068b
commit f0dddaa51a
20 changed files with 878 additions and 548 deletions

View File

@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
import org.apache.druid.segment.Capabilities;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
@ -79,7 +78,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
@Nullable TopNQueryMetrics queryMetrics
)
{
if (params.getCardinality() != TopNColumnSelectorStrategy.CARDINALITY_UNKNOWN) {
if (params.getCardinality() != TopNParams.CARDINALITY_UNKNOWN) {
runWithCardinalityKnown(params, resultBuilder, dimValSelector, queryMetrics);
} else {
runWithCardinalityUnknown(params, resultBuilder, queryMetrics);

View File

@ -21,21 +21,22 @@ package org.apache.druid.query.topn;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.StorageAdapter;
import java.util.Map;
/**
* This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle multiple index numerals referencing the same dimension value.
* Heap based topn algorithm that handles aggregates on dimension extractions and numeric typed dimension columns.
*
* This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle
* multiple index numerals referencing the same dimension value.
*/
public class DimExtractionTopNAlgorithm
extends BaseTopNAlgorithm<Aggregator[][], Map<Comparable, Aggregator[]>, TopNParams>
public class HeapBasedTopNAlgorithm
extends BaseTopNAlgorithm<Aggregator[][], TopNColumnAggregatesProcessor, TopNParams>
{
private final TopNQuery query;
public DimExtractionTopNAlgorithm(
public HeapBasedTopNAlgorithm(
StorageAdapter storageAdapter,
TopNQuery query
)
@ -47,7 +48,7 @@ public class DimExtractionTopNAlgorithm
@Override
public TopNParams makeInitParams(
final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus,
final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus,
final Cursor cursor
)
{
@ -64,8 +65,8 @@ public class DimExtractionTopNAlgorithm
if (params.getCardinality() < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
}
ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
return selectorPlus.getColumnSelectorStrategy().getDimExtractionRowSelector(query, params, storageAdapter);
ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus = params.getSelectorPlus();
return selectorPlus.getColumnSelectorStrategy().getRowSelector(query, params, storageAdapter);
}
@Override
@ -75,54 +76,46 @@ public class DimExtractionTopNAlgorithm
}
@Override
protected Map<Comparable, Aggregator[]> makeDimValAggregateStore(TopNParams params)
protected TopNColumnAggregatesProcessor makeDimValAggregateStore(TopNParams params)
{
final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
return selectorPlus.getColumnSelectorStrategy().makeDimExtractionAggregateStore();
final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus = params.getSelectorPlus();
return selectorPlus.getColumnSelectorStrategy();
}
@Override
public long scanAndAggregate(
protected long scanAndAggregate(
TopNParams params,
Aggregator[][] rowSelector,
Map<Comparable, Aggregator[]> aggregatesStore
TopNColumnAggregatesProcessor processor
)
{
final Cursor cursor = params.getCursor();
final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus = params.getSelectorPlus();
return selectorPlus.getColumnSelectorStrategy().dimExtractionScanAndAggregate(
processor.initAggregateStore();
return processor.scanAndAggregate(
query,
selectorPlus.getSelector(),
cursor,
rowSelector,
aggregatesStore
rowSelector
);
}
@Override
protected void updateResults(
TopNParams params,
Aggregator[][] rowSelector,
Map<Comparable, Aggregator[]> aggregatesStore,
Aggregator[][] aggregators,
TopNColumnAggregatesProcessor processor,
TopNResultBuilder resultBuilder
)
{
final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
selectorPlus.getColumnSelectorStrategy().updateDimExtractionResults(
aggregatesStore,
resultBuilder
);
processor.updateResults(resultBuilder);
}
@Override
protected void closeAggregators(Map<Comparable, Aggregator[]> valueMap)
protected void closeAggregators(TopNColumnAggregatesProcessor processor)
{
for (Aggregator[] aggregators : valueMap.values()) {
for (Aggregator agg : aggregators) {
agg.close();
}
}
processor.closeAggregators();
}
@Override

View File

@ -27,10 +27,11 @@ import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ValueType;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Comparable, Aggregator[]>, TopNParams>
public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Comparable<?>, Aggregator[]>, TopNParams>
{
private static final int[] EMPTY_INTS = new int[]{};
@ -74,17 +75,16 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Co
}
@Override
@SuppressWarnings("unchecked")
protected Map<Comparable, Aggregator[]> makeDimValAggregateStore(TopNParams params)
protected Map<Comparable<?>, Aggregator[]> makeDimValAggregateStore(TopNParams params)
{
return params.getSelectorPlus().getColumnSelectorStrategy().makeDimExtractionAggregateStore();
return new HashMap<>();
}
@Override
protected long scanAndAggregate(
TopNParams params,
int[] dimValSelector,
Map<Comparable, Aggregator[]> aggregatesStore
Map<Comparable<?>, Aggregator[]> aggregatesStore
)
{
if (params.getCardinality() < 0) {
@ -96,7 +96,7 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Co
long processedRows = 0;
while (!cursor.isDone()) {
final Comparable key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));
final Comparable<?> key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));
Aggregator[] theAggregators = aggregatesStore.get(key);
if (theAggregators == null) {
@ -118,11 +118,11 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Co
protected void updateResults(
TopNParams params,
int[] dimValSelector,
Map<Comparable, Aggregator[]> aggregatesStore,
Map<Comparable<?>, Aggregator[]> aggregatesStore,
TopNResultBuilder resultBuilder
)
{
for (Map.Entry<Comparable, Aggregator[]> entry : aggregatesStore.entrySet()) {
for (Map.Entry<Comparable<?>, Aggregator[]> entry : aggregatesStore.entrySet()) {
Aggregator[] aggs = entry.getValue();
if (aggs != null) {
Object[] vals = new Object[aggs.length];
@ -140,7 +140,7 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Co
}
@Override
protected void closeAggregators(Map<Comparable, Aggregator[]> stringMap)
protected void closeAggregators(Map<Comparable<?>, Aggregator[]> stringMap)
{
for (Aggregator[] aggregators : stringMap.values()) {
for (Aggregator agg : aggregators) {

View File

@ -21,7 +21,7 @@ package org.apache.druid.query.topn;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.segment.Cursor;
import javax.annotation.Nullable;
@ -34,7 +34,7 @@ public interface TopNAlgorithm<DimValSelector, Parameters extends TopNParams>
int INIT_POSITION_VALUE = -1;
int SKIP_POSITION_VALUE = -2;
TopNParams makeInitParams(ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus, Cursor cursor);
TopNParams makeInitParams(ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus, Cursor cursor);
void run(
Parameters params,

View File

@ -21,7 +21,8 @@ package org.apache.druid.query.topn;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.Result;
import org.apache.druid.query.topn.types.TopNColumnSelectorStrategyFactory;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionHandlerUtils;
@ -42,13 +43,15 @@ public class TopNMapFn
}
@SuppressWarnings("unchecked")
@Nullable
public Result<TopNResultValue> apply(final Cursor cursor, final @Nullable TopNQueryMetrics queryMetrics)
{
final ColumnSelectorPlus selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus(
new TopNColumnSelectorStrategyFactory(query.getDimensionSpec().getOutputType()),
query.getDimensionSpec(),
cursor.getColumnSelectorFactory()
);
final ColumnSelectorPlus<TopNColumnAggregatesProcessor<?>> selectorPlus =
DimensionHandlerUtils.createColumnSelectorPlus(
new TopNColumnAggregatesProcessorFactory(query.getDimensionSpec().getOutputType()),
query.getDimensionSpec(),
cursor.getColumnSelectorFactory()
);
if (selectorPlus.getSelector() == null) {
return null;

View File

@ -20,7 +20,7 @@
package org.apache.druid.query.topn;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
@ -28,13 +28,14 @@ import org.apache.druid.segment.DimensionSelector;
*/
public class TopNParams
{
public static final int CARDINALITY_UNKNOWN = -1;
private final Cursor cursor;
private final int cardinality;
private final int numValuesPerPass;
private final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus;
private final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus;
protected TopNParams(
ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus,
ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus,
Cursor cursor,
int numValuesPerPass
)
@ -52,7 +53,7 @@ public class TopNParams
return (DimensionSelector) selectorPlus.getSelector();
}
public ColumnSelectorPlus<TopNColumnSelectorStrategy> getSelectorPlus()
public ColumnSelectorPlus<TopNColumnAggregatesProcessor> getSelectorPlus()
{
return selectorPlus;
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.query.topn;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import org.apache.druid.collections.NonBlockingPool;
@ -30,7 +29,6 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.SegmentMissingException;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnCapabilities;
@ -86,16 +84,11 @@ public class TopNQueryEngine
query.isDescending(),
queryMetrics
),
new Function<Cursor, Result<TopNResultValue>>()
{
@Override
public Result<TopNResultValue> apply(Cursor input)
{
if (queryMetrics != null) {
queryMetrics.cursor(input);
}
return mapFn.apply(input, queryMetrics);
input -> {
if (queryMetrics != null) {
queryMetrics.cursor(input);
}
return mapFn.apply(input, queryMetrics);
}
),
Predicates.notNull()
@ -125,7 +118,8 @@ public class TopNQueryEngine
final ColumnCapabilities columnCapabilities = query.getVirtualColumns()
.getColumnCapabilitiesWithFallback(adapter, dimension);
final TopNAlgorithm topNAlgorithm;
final TopNAlgorithm<?, ?> topNAlgorithm;
if (
selector.isHasExtractionFn() &&
// TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
@ -137,20 +131,23 @@ public class TopNQueryEngine
// currently relies on the dimension cardinality to support lexicographic sorting
topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
} else if (selector.isHasExtractionFn()) {
topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (columnCapabilities != null && !(columnCapabilities.getType() == ValueType.STRING
&& columnCapabilities.isDictionaryEncoded())) {
// Use DimExtraction for non-Strings and for non-dictionary-encoded Strings.
topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
// Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings.
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
// Use DimExtraction when the dimension output type is a non-String. (It's like an extractionFn: there can be
// Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be
// a many-to-one mapping, since numeric types can't represent all possible values of other types.)
topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (selector.isAggregateAllMetrics()) {
// sorted by dimension
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
} else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
// high cardinality dimensions with larger result sets
topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool);
} else {
// anything else
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
}
if (queryMetrics != null) {

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.topn.types;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.topn.BaseTopNAlgorithm;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.Cursor;
import java.util.Map;
import java.util.function.Function;
public class DoubleTopNColumnAggregatesProcessor
extends NullableNumericTopNColumnAggregatesProcessor<BaseDoubleColumnValueSelector>
{
private Long2ObjectMap<Aggregator[]> aggregatesStore;
protected DoubleTopNColumnAggregatesProcessor(Function<Object, Comparable<?>> converter)
{
super(converter);
}
@Override
Aggregator[] getValueAggregators(
TopNQuery query,
BaseDoubleColumnValueSelector selector,
Cursor cursor
)
{
long key = Double.doubleToLongBits(selector.getDouble());
return aggregatesStore.computeIfAbsent(
key,
k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
);
}
@Override
public void initAggregateStore()
{
nullValueAggregates = null;
aggregatesStore = new Long2ObjectOpenHashMap<>();
}
@Override
Map<?, Aggregator[]> getAggregatesStore()
{
return aggregatesStore;
}
@Override
Comparable<?> convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
{
return converter.apply(Double.longBitsToDouble((Long) aggregatorStoreKey));
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.topn.types;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.topn.BaseTopNAlgorithm;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.Cursor;
import java.util.Map;
import java.util.function.Function;
public class FloatTopNColumnAggregatesProcessor
extends NullableNumericTopNColumnAggregatesProcessor<BaseFloatColumnValueSelector>
{
private Int2ObjectMap<Aggregator[]> aggregatesStore;
protected FloatTopNColumnAggregatesProcessor(Function<Object, Comparable<?>> converter)
{
super(converter);
}
@Override
Aggregator[] getValueAggregators(
TopNQuery query,
BaseFloatColumnValueSelector selector,
Cursor cursor
)
{
int key = Float.floatToIntBits(selector.getFloat());
return aggregatesStore.computeIfAbsent(
key,
k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
);
}
@Override
public void initAggregateStore()
{
nullValueAggregates = null;
this.aggregatesStore = new Int2ObjectOpenHashMap<>();
}
@Override
Map<?, Aggregator[]> getAggregatesStore()
{
return aggregatesStore;
}
@Override
Comparable<?> convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
{
return converter.apply(Float.intBitsToFloat((Integer) aggregatorStoreKey));
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.topn.types;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.topn.BaseTopNAlgorithm;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.Cursor;
import java.util.Map;
import java.util.function.Function;
public class LongTopNColumnAggregatesProcessor
extends NullableNumericTopNColumnAggregatesProcessor<BaseLongColumnValueSelector>
{
private Long2ObjectMap<Aggregator[]> aggregatesStore;
public LongTopNColumnAggregatesProcessor(Function<Object, Comparable<?>> converter)
{
super(converter);
}
@Override
Aggregator[] getValueAggregators(TopNQuery query, BaseLongColumnValueSelector selector, Cursor cursor)
{
long key = selector.getLong();
return aggregatesStore.computeIfAbsent(
key,
k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
);
}
@Override
public void initAggregateStore()
{
nullValueAggregates = null;
aggregatesStore = new Long2ObjectOpenHashMap<>();
}
@Override
Map<?, Aggregator[]> getAggregatesStore()
{
return aggregatesStore;
}
@Override
Comparable<?> convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
{
return converter.apply(aggregatorStoreKey);
}
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.topn.types;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.topn.BaseTopNAlgorithm;
import org.apache.druid.query.topn.TopNParams;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNResultBuilder;
import org.apache.druid.segment.BaseNullableColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.StorageAdapter;
import java.util.Map;
import java.util.function.Function;
/**
* Base {@link TopNColumnAggregatesProcessor} for {@link BaseNullableColumnValueSelector}. Non-null selector values
* aggregates are stored in a type appropriate primitive map, created by {@link #initAggregateStore()} and available
* via {@link #getAggregatesStore()}, and null valued row aggregates are stored in a separate
* {@link #nullValueAggregates} {@link Aggregator} array.
*
* {@link #updateResults} will combine both the map and null aggregates to populate the {@link TopNResultBuilder} with
* the values produced by {@link #scanAndAggregate}.
*/
public abstract class NullableNumericTopNColumnAggregatesProcessor<Selector extends BaseNullableColumnValueSelector>
implements TopNColumnAggregatesProcessor<Selector>
{
private final boolean hasNulls = !NullHandling.replaceWithDefault();
final Function<Object, Comparable<?>> converter;
Aggregator[] nullValueAggregates;
protected NullableNumericTopNColumnAggregatesProcessor(Function<Object, Comparable<?>> converter)
{
this.converter = converter;
}
/**
* Get {@link Aggregator} set for the current {@param Selector} row value for a given {@link Cursor}
*/
abstract Aggregator[] getValueAggregators(TopNQuery query, Selector selector, Cursor cursor);
/**
* Get primitive numeric map for value aggregates created by {@link #scanAndAggregate}, to be used by
* {@link #updateResults} to apply to the {@link TopNResultBuilder}
*/
abstract Map<?, Aggregator[]> getAggregatesStore();
/**
* Method to convert primitive numeric value keys used by {@link #getAggregatesStore} into the correct representation
* for the {@link TopNResultBuilder}, called by {@link #updateResults}
*/
abstract Comparable<?> convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey);
@Override
public int getCardinality(Selector selector)
{
return TopNParams.CARDINALITY_UNKNOWN;
}
@Override
public Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
{
return null;
}
@Override
public long scanAndAggregate(
TopNQuery query,
Selector selector,
Cursor cursor,
Aggregator[][] rowSelector
)
{
long processedRows = 0;
while (!cursor.isDone()) {
if (hasNulls && selector.isNull()) {
if (nullValueAggregates == null) {
nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
}
for (Aggregator aggregator : nullValueAggregates) {
aggregator.aggregate();
}
} else {
Aggregator[] valueAggregates = getValueAggregators(query, selector, cursor);
for (Aggregator aggregator : valueAggregates) {
aggregator.aggregate();
}
}
cursor.advance();
processedRows++;
}
return processedRows;
}
@Override
public void updateResults(TopNResultBuilder resultBuilder)
{
for (Map.Entry<?, Aggregator[]> entry : getAggregatesStore().entrySet()) {
Aggregator[] aggs = entry.getValue();
if (aggs != null) {
Object[] vals = new Object[aggs.length];
for (int i = 0; i < aggs.length; i++) {
vals[i] = aggs[i].get();
}
final Comparable<?> key = convertAggregatorStoreKeyToColumnValue(entry.getKey());
resultBuilder.addEntry(key, key, vals);
}
}
if (nullValueAggregates != null) {
Object[] nullVals = new Object[nullValueAggregates.length];
for (int i = 0; i < nullValueAggregates.length; i++) {
nullVals[i] = nullValueAggregates[i].get();
}
resultBuilder.addEntry(null, null, nullVals);
}
}
@Override
public void closeAggregators()
{
for (Aggregator[] aggregators : getAggregatesStore().values()) {
for (Aggregator agg : aggregators) {
agg.close();
}
}
if (nullValueAggregates != null) {
for (Aggregator nullAgg : nullValueAggregates) {
nullAgg.close();
}
}
}
}

View File

@ -1,277 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.topn.types;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.topn.BaseTopNAlgorithm;
import org.apache.druid.query.topn.TopNParams;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNResultBuilder;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ValueType;
import java.util.Map;
import java.util.function.Function;
public abstract class NumericTopNColumnSelectorStrategy<
ValueSelectorType,
DimExtractionAggregateStoreType extends Map<?, Aggregator[]>>
implements TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggregateStoreType>
{
public static TopNColumnSelectorStrategy ofType(final ValueType selectorType, final ValueType dimensionType)
{
final Function<Object, Comparable<?>> converter = DimensionHandlerUtils.converterFromTypeToType(
selectorType,
dimensionType
);
switch (selectorType) {
case LONG:
return new OfLong(converter);
case FLOAT:
return new OfFloat(converter);
case DOUBLE:
return new OfDouble(converter);
default:
throw new IAE("No strategy for type[%s]", selectorType);
}
}
@Override
public int getCardinality(ValueSelectorType selector)
{
return TopNColumnSelectorStrategy.CARDINALITY_UNKNOWN;
}
@Override
public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
{
return null;
}
static long floatDimExtractionScanAndAggregate(
TopNQuery query,
BaseFloatColumnValueSelector selector,
Cursor cursor,
Int2ObjectMap<Aggregator[]> aggregatesStore
)
{
long processedRows = 0;
while (!cursor.isDone()) {
int key = Float.floatToIntBits(selector.getFloat());
Aggregator[] theAggregators = aggregatesStore.get(key);
if (theAggregators == null) {
theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
aggregatesStore.put(key, theAggregators);
}
for (Aggregator aggregator : theAggregators) {
aggregator.aggregate();
}
cursor.advance();
processedRows++;
}
return processedRows;
}
static long doubleDimExtractionScanAndAggregate(
TopNQuery query,
BaseDoubleColumnValueSelector selector,
Cursor cursor,
Long2ObjectMap<Aggregator[]> aggregatesStore
)
{
long processedRows = 0;
while (!cursor.isDone()) {
long key = Double.doubleToLongBits(selector.getDouble());
Aggregator[] theAggregators = aggregatesStore.get(key);
if (theAggregators == null) {
theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
aggregatesStore.put(key, theAggregators);
}
for (Aggregator aggregator : theAggregators) {
aggregator.aggregate();
}
cursor.advance();
processedRows++;
}
return processedRows;
}
static long longDimExtractionScanAndAggregate(
TopNQuery query,
BaseLongColumnValueSelector selector,
Cursor cursor,
Long2ObjectMap<Aggregator[]> aggregatesStore
)
{
long processedRows = 0;
while (!cursor.isDone()) {
long key = selector.getLong();
Aggregator[] theAggregators = aggregatesStore.get(key);
if (theAggregators == null) {
theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
aggregatesStore.put(key, theAggregators);
}
for (Aggregator aggregator : theAggregators) {
aggregator.aggregate();
}
cursor.advance();
processedRows++;
}
return processedRows;
}
@Override
public void updateDimExtractionResults(
final DimExtractionAggregateStoreType aggregatesStore,
final TopNResultBuilder resultBuilder
)
{
for (Map.Entry<?, Aggregator[]> entry : aggregatesStore.entrySet()) {
Aggregator[] aggs = entry.getValue();
if (aggs != null) {
Object[] vals = new Object[aggs.length];
for (int i = 0; i < aggs.length; i++) {
vals[i] = aggs[i].get();
}
final Comparable key = convertAggregatorStoreKeyToColumnValue(entry.getKey());
resultBuilder.addEntry(key, key, vals);
}
}
}
abstract Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey);
static class OfFloat
extends NumericTopNColumnSelectorStrategy<BaseFloatColumnValueSelector, Int2ObjectMap<Aggregator[]>>
{
private final Function<Object, Comparable<?>> converter;
OfFloat(final Function<Object, Comparable<?>> converter)
{
this.converter = converter;
}
@Override
public Int2ObjectMap<Aggregator[]> makeDimExtractionAggregateStore()
{
return new Int2ObjectOpenHashMap<>();
}
@Override
Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
{
return converter.apply(Float.intBitsToFloat((Integer) aggregatorStoreKey));
}
@Override
public long dimExtractionScanAndAggregate(
TopNQuery query,
BaseFloatColumnValueSelector selector,
Cursor cursor,
Aggregator[][] rowSelector,
Int2ObjectMap<Aggregator[]> aggregatesStore
)
{
return floatDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore);
}
}
static class OfLong
extends NumericTopNColumnSelectorStrategy<BaseLongColumnValueSelector, Long2ObjectMap<Aggregator[]>>
{
private final Function<Object, Comparable<?>> converter;
OfLong(final Function<Object, Comparable<?>> converter)
{
this.converter = converter;
}
@Override
public Long2ObjectMap<Aggregator[]> makeDimExtractionAggregateStore()
{
return new Long2ObjectOpenHashMap<>();
}
@Override
Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
{
return converter.apply(aggregatorStoreKey);
}
@Override
public long dimExtractionScanAndAggregate(
TopNQuery query,
BaseLongColumnValueSelector selector,
Cursor cursor,
Aggregator[][] rowSelector,
Long2ObjectMap<Aggregator[]> aggregatesStore
)
{
return longDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore);
}
}
static class OfDouble
extends NumericTopNColumnSelectorStrategy<BaseDoubleColumnValueSelector, Long2ObjectMap<Aggregator[]>>
{
private final Function<Object, Comparable<?>> converter;
OfDouble(final Function<Object, Comparable<?>> converter)
{
this.converter = converter;
}
@Override
public Long2ObjectMap<Aggregator[]> makeDimExtractionAggregateStore()
{
return new Long2ObjectOpenHashMap<>();
}
@Override
Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
{
return converter.apply(Double.longBitsToDouble((Long) aggregatorStoreKey));
}
@Override
public long dimExtractionScanAndAggregate(
TopNQuery query,
BaseDoubleColumnValueSelector selector,
Cursor cursor,
Aggregator[][] rowSelector,
Long2ObjectMap<Aggregator[]> aggregatesStore
)
{
return doubleDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore);
}
}
}

View File

@ -36,12 +36,12 @@ import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
public class StringTopNColumnSelectorStrategy
implements TopNColumnSelectorStrategy<DimensionSelector, Map<Comparable<?>, Aggregator[]>>
public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregatesProcessor<DimensionSelector>
{
private final Function<Object, Comparable<?>> dimensionValueConverter;
private HashMap<Comparable<?>, Aggregator[]> aggregatesStore;
public StringTopNColumnSelectorStrategy(final ValueType dimensionType)
public StringTopNColumnAggregatesProcessor(final ValueType dimensionType)
{
this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToType(ValueType.STRING, dimensionType);
}
@ -53,7 +53,7 @@ public class StringTopNColumnSelectorStrategy
}
@Override
public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
public Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
{
if (params.getCardinality() < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
@ -74,34 +74,9 @@ public class StringTopNColumnSelectorStrategy
}
@Override
public Map<Comparable<?>, Aggregator[]> makeDimExtractionAggregateStore()
public void updateResults(TopNResultBuilder resultBuilder)
{
return new HashMap<>();
}
@Override
public long dimExtractionScanAndAggregate(
TopNQuery query,
DimensionSelector selector,
Cursor cursor,
Aggregator[][] rowSelector,
Map<Comparable<?>, Aggregator[]> aggregatesStore
)
{
if (selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN) {
return dimExtractionScanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector, aggregatesStore);
} else {
return dimExtractionScanAndAggregateWithCardinalityUnknown(query, cursor, selector, aggregatesStore);
}
}
@Override
public void updateDimExtractionResults(
final Map<Comparable<?>, Aggregator[]> aggregatesStore,
final TopNResultBuilder resultBuilder
)
{
for (Map.Entry<Comparable<?>, Aggregator[]> entry : aggregatesStore.entrySet()) {
for (Map.Entry<?, Aggregator[]> entry : aggregatesStore.entrySet()) {
Aggregator[] aggs = entry.getValue();
if (aggs != null) {
Object[] vals = new Object[aggs.length];
@ -115,12 +90,42 @@ public class StringTopNColumnSelectorStrategy
}
}
private long dimExtractionScanAndAggregateWithCardinalityKnown(
@Override
public void closeAggregators()
{
for (Aggregator[] aggregators : aggregatesStore.values()) {
for (Aggregator agg : aggregators) {
agg.close();
}
}
}
@Override
public long scanAndAggregate(
TopNQuery query,
DimensionSelector selector,
Cursor cursor,
Aggregator[][] rowSelector
)
{
if (selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN) {
return scanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector);
} else {
return scanAndAggregateWithCardinalityUnknown(query, cursor, selector);
}
}
@Override
public void initAggregateStore()
{
this.aggregatesStore = new HashMap<>();
}
private long scanAndAggregateWithCardinalityKnown(
TopNQuery query,
Cursor cursor,
DimensionSelector selector,
Aggregator[][] rowSelector,
Map<Comparable<?>, Aggregator[]> aggregatesStore
Aggregator[][] rowSelector
)
{
long processedRows = 0;
@ -128,18 +133,17 @@ public class StringTopNColumnSelectorStrategy
final IndexedInts dimValues = selector.getRow();
for (int i = 0, size = dimValues.size(); i < size; ++i) {
final int dimIndex = dimValues.get(i);
Aggregator[] theAggregators = rowSelector[dimIndex];
if (theAggregators == null) {
Aggregator[] aggs = rowSelector[dimIndex];
if (aggs == null) {
final Comparable<?> key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
theAggregators = aggregatesStore.get(key);
if (theAggregators == null) {
theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
aggregatesStore.put(key, theAggregators);
}
rowSelector[dimIndex] = theAggregators;
aggs = aggregatesStore.computeIfAbsent(
key,
k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
);
rowSelector[dimIndex] = aggs;
}
for (Aggregator aggregator : theAggregators) {
for (Aggregator aggregator : aggs) {
aggregator.aggregate();
}
}
@ -149,11 +153,10 @@ public class StringTopNColumnSelectorStrategy
return processedRows;
}
private long dimExtractionScanAndAggregateWithCardinalityUnknown(
private long scanAndAggregateWithCardinalityUnknown(
TopNQuery query,
Cursor cursor,
DimensionSelector selector,
Map<Comparable<?>, Aggregator[]> aggregatesStore
DimensionSelector selector
)
{
long processedRows = 0;
@ -162,13 +165,11 @@ public class StringTopNColumnSelectorStrategy
for (int i = 0, size = dimValues.size(); i < size; ++i) {
final int dimIndex = dimValues.get(i);
final Comparable<?> key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
Aggregator[] theAggregators = aggregatesStore.get(key);
if (theAggregators == null) {
theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
aggregatesStore.put(key, theAggregators);
}
for (Aggregator aggregator : theAggregators) {
Aggregator[] aggs = aggregatesStore.computeIfAbsent(
key,
k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
);
for (Aggregator aggregator : aggs) {
aggregator.aggregate();
}
}

View File

@ -21,28 +21,39 @@ package org.apache.druid.query.topn.types;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.dimension.ColumnSelectorStrategy;
import org.apache.druid.query.topn.HeapBasedTopNAlgorithm;
import org.apache.druid.query.topn.TopNParams;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNResultBuilder;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.StorageAdapter;
import java.util.Map;
import javax.annotation.Nullable;
public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggregateStoreType extends Map>
extends ColumnSelectorStrategy
/**
* This {@link ColumnSelectorStrategy} is used by all {@link org.apache.druid.query.topn.TopNAlgorithm} to provide
* selector value cardinality to {@link TopNParams} (perhaps unecessarily, but that is another matter), but is primarily
* used by {@link HeapBasedTopNAlgorithm} to serve as its value aggregates store.
*
* Given a query, column value selector, and cursor to process, the aggregates store is populated by calling
* {@link #scanAndAggregate} and can be applied to {@link TopNResultBuilder} through {@link #updateResults}.
*/
public interface TopNColumnAggregatesProcessor<ValueSelectorType> extends ColumnSelectorStrategy
{
int CARDINALITY_UNKNOWN = -1;
/**
* Get value cardinality of underlying {@link ColumnValueSelector}
*/
int getCardinality(ValueSelectorType selector);
/**
* Used by DimExtractionTopNAlgorithm.
* Used by {@link HeapBasedTopNAlgorithm}.
*
* Create an Aggregator[][] using BaseTopNAlgorithm.AggregatorArrayProvider and the given parameters.
* Create an Aggregator[][] using {@link org.apache.druid.query.topn.BaseTopNAlgorithm.AggregatorArrayProvider} and
* the given parameters.
*
* As the Aggregator[][] is used as an integer-based lookup, this method is only applicable for dimension types
* that use integer row values.
* that use integer row values, e.g. string columns where the value cardinality is known.
*
* A dimension type that does not have integer values should return null.
*
@ -53,30 +64,24 @@ public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggr
*
* @return an Aggregator[][] for integer-valued dimensions, null otherwise
*/
Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter);
@Nullable
Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter);
/**
* Used by DimExtractionTopNAlgorithm.
* Used by {@link HeapBasedTopNAlgorithm}. The contract of this method requires calling {@link #initAggregateStore()}
* prior to calling this method.
*
* Creates an aggregate store map suitable for this strategy's type that will be
* passed to dimExtractionScanAndAggregate() and updateDimExtractionResults().
* Iterate through the {@link Cursor}, reading the current row from a dimension value selector, and for each row
* value:
* 1. Retrieve the Aggregator[] for the row value from rowSelector (fast integer lookup), usable if value cardinality
* is known, or from aggregatesStore (slower map).
*
* @return Aggregate store map
*/
DimExtractionAggregateStoreType makeDimExtractionAggregateStore();
/**
* Used by DimExtractionTopNAlgorithm.
* 2. If the rowSelector/aggregatesStore did not have an entry for a particular row value, this function
* should retrieve the current Aggregator[] using
* {@link org.apache.druid.query.topn.BaseTopNAlgorithm#makeAggregators} and the provided cursor and query,
* storing them in rowSelector/aggregatesStore
*
* Iterate through the cursor, reading the current row from a dimension value selector, and for each row value:
* 1. Retrieve the Aggregator[] for the row value from rowSelector (fast integer lookup) or from
* aggregatesStore (slower map).
*
* 2. If the rowSelector and/or aggregatesStore did not have an entry for a particular row value,
* this function should retrieve the current Aggregator[] using BaseTopNAlgorithm.makeAggregators() and the
* provided cursor and query, storing them in rowSelector and aggregatesStore
*
* 3. Call aggregate() on each of the aggregators.
* 3. Call {@link Aggregator#aggregate()} on each of the aggregators.
*
* If a dimension type doesn't have integer values, it should ignore rowSelector and use the aggregatesStore map only.
*
@ -84,29 +89,33 @@ public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggr
* @param selector Dimension value selector
* @param cursor Cursor for the segment being queried
* @param rowSelector Integer lookup containing aggregators
* @param aggregatesStore Map containing aggregators
*
* @return the number of processed rows (after postFilters are applied inside the cursor being processed)
*/
long dimExtractionScanAndAggregate(
long scanAndAggregate(
TopNQuery query,
ValueSelectorType selector,
Cursor cursor,
Aggregator[][] rowSelector,
DimExtractionAggregateStoreType aggregatesStore
Aggregator[][] rowSelector
);
/**
* Used by DimExtractionTopNAlgorithm.
* Used by {@link HeapBasedTopNAlgorithm}.
*
* Read entries from the aggregates store, adding the keys and associated values to the resultBuilder, applying the
* valueTransformer to the keys if present
*
* @param aggregatesStore Map created by makeDimExtractionAggregateStore()
* @param resultBuilder TopN result builder
*/
void updateDimExtractionResults(
DimExtractionAggregateStoreType aggregatesStore,
TopNResultBuilder resultBuilder
);
void updateResults(TopNResultBuilder resultBuilder);
/**
* Initializes the underlying aggregates store to something nice and seleector type appropriate
*/
void initAggregateStore();
/**
* Closes all on heap {@link Aggregator} associated withe the aggregates processor
*/
void closeAggregators();
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.topn.types;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import java.util.function.Function;
public class TopNColumnAggregatesProcessorFactory
implements ColumnSelectorStrategyFactory<TopNColumnAggregatesProcessor<?>>
{
private final ValueType dimensionType;
public TopNColumnAggregatesProcessorFactory(final ValueType dimensionType)
{
this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType");
}
@Override
public TopNColumnAggregatesProcessor<?> makeColumnSelectorStrategy(
ColumnCapabilities capabilities,
ColumnValueSelector selector
)
{
final ValueType selectorType = capabilities.getType();
if (selectorType.equals(ValueType.STRING)) {
return new StringTopNColumnAggregatesProcessor(dimensionType);
} else if (selectorType.isNumeric()) {
final Function<Object, Comparable<?>> converter;
final ValueType strategyType;
// When the selector is numeric, we want to use NumericTopNColumnSelectorStrategy. It aggregates using
// a numeric type and then converts to the desired output type after aggregating. We must be careful not to
// convert to an output type that cannot represent all possible values of the input type.
if (ValueType.isNumeric(dimensionType)) {
// Return strategy that aggregates using the _output_ type, because this allows us to collapse values
// properly (numeric types cannot always represent all values of other numeric types).
converter = DimensionHandlerUtils.converterFromTypeToType(dimensionType, dimensionType);
strategyType = dimensionType;
} else {
// Return strategy that aggregates using the _input_ type. Here we are assuming that the output type can
// represent all possible values of the input type. This will be true for STRING, which is the only
// non-numeric type currently supported.
converter = DimensionHandlerUtils.converterFromTypeToType(selectorType, dimensionType);
strategyType = selectorType;
}
switch (strategyType) {
case LONG:
return new LongTopNColumnAggregatesProcessor(converter);
case FLOAT:
return new FloatTopNColumnAggregatesProcessor(converter);
case DOUBLE:
return new DoubleTopNColumnAggregatesProcessor(converter);
}
}
throw new IAE("Cannot create query type helper from invalid type [%s]", selectorType);
}
}

View File

@ -1,71 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.topn.types;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
public class TopNColumnSelectorStrategyFactory implements ColumnSelectorStrategyFactory<TopNColumnSelectorStrategy>
{
private final ValueType dimensionType;
public TopNColumnSelectorStrategyFactory(final ValueType dimensionType)
{
this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType");
}
@Override
public TopNColumnSelectorStrategy makeColumnSelectorStrategy(
ColumnCapabilities capabilities,
ColumnValueSelector selector
)
{
final ValueType selectorType = capabilities.getType();
switch (selectorType) {
case STRING:
// Return strategy that reads strings and outputs dimensionTypes.
return new StringTopNColumnSelectorStrategy(dimensionType);
case LONG:
case FLOAT:
case DOUBLE:
// When the selector is numeric, we want to use NumericTopNColumnSelectorStrategy. It aggregates using
// a numeric type and then converts to the desired output type after aggregating. We must be careful not to
// convert to an output type that cannot represent all possible values of the input type.
if (ValueType.isNumeric(dimensionType)) {
// Return strategy that aggregates using the _output_ type, because this allows us to collapse values
// properly (numeric types cannot always represent all values of other numeric types).
return NumericTopNColumnSelectorStrategy.ofType(dimensionType, dimensionType);
} else {
// Return strategy that aggregates using the _input_ type. Here we are assuming that the output type can
// represent all possible values of the input type. This will be true for STRING, which is the only
// non-numeric type currently supported.
return NumericTopNColumnSelectorStrategy.ofType(selectorType, dimensionType);
}
default:
throw new IAE("Cannot create query type helper from invalid type [%s]", selectorType);
}
}
}

View File

@ -63,9 +63,9 @@ public final class DimensionHandlerUtils
.setDictionaryEncoded(true)
.setHasBitmapIndexes(true);
public static DimensionHandler getHandlerFromCapabilities(
public static DimensionHandler<?, ?, ?> getHandlerFromCapabilities(
String dimensionName,
ColumnCapabilities capabilities,
@Nullable ColumnCapabilities capabilities,
@Nullable MultiValueHandling multiValueHandling
)
{
@ -112,7 +112,7 @@ public final class DimensionHandlerUtils
* {@link #createColumnSelectorPluses(ColumnSelectorStrategyFactory, List, ColumnSelectorFactory)} with a singleton
* list of dimensionSpecs and then retrieving the only element in the returned array.
*
* @param <ColumnSelectorStrategyClass> The strategy type created by the provided strategy factory.
* @param <Strategy> The strategy type created by the provided strategy factory.
* @param strategyFactory A factory provided by query engines that generates type-handling strategies
* @param dimensionSpec column to generate a ColumnSelectorPlus object for
* @param cursor Used to create value selectors for columns.
@ -121,8 +121,8 @@ public final class DimensionHandlerUtils
*
* @see ColumnProcessors#makeProcessor which may replace this in the future
*/
public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy> ColumnSelectorPlus<ColumnSelectorStrategyClass> createColumnSelectorPlus(
ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
public static <Strategy extends ColumnSelectorStrategy> ColumnSelectorPlus<Strategy> createColumnSelectorPlus(
ColumnSelectorStrategyFactory<Strategy> strategyFactory,
DimensionSpec dimensionSpec,
ColumnSelectorFactory cursor
)
@ -141,7 +141,7 @@ public final class DimensionHandlerUtils
* A caller should define a strategy factory that provides an interface for type-specific operations
* in a query engine. See GroupByStrategyFactory for a reference.
*
* @param <ColumnSelectorStrategyClass> The strategy type created by the provided strategy factory.
* @param <Strategy> The strategy type created by the provided strategy factory.
* @param strategyFactory A factory provided by query engines that generates type-handling strategies
* @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for
* @param columnSelectorFactory Used to create value selectors for columns.
@ -150,32 +150,29 @@ public final class DimensionHandlerUtils
*
* @see ColumnProcessors#makeProcessor which may replace this in the future
*/
public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy>
//CHECKSTYLE.OFF: Indentation
ColumnSelectorPlus<ColumnSelectorStrategyClass>[] createColumnSelectorPluses(
//CHECKSTYLE.ON: Indentation
ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
public static <Strategy extends ColumnSelectorStrategy> ColumnSelectorPlus<Strategy>[] createColumnSelectorPluses(
ColumnSelectorStrategyFactory<Strategy> strategyFactory,
List<DimensionSpec> dimensionSpecs,
ColumnSelectorFactory columnSelectorFactory
)
{
int dimCount = dimensionSpecs.size();
@SuppressWarnings("unchecked")
ColumnSelectorPlus<ColumnSelectorStrategyClass>[] dims = new ColumnSelectorPlus[dimCount];
ColumnSelectorPlus<Strategy>[] dims = new ColumnSelectorPlus[dimCount];
for (int i = 0; i < dimCount; i++) {
final DimensionSpec dimSpec = dimensionSpecs.get(i);
final String dimName = dimSpec.getDimension();
final ColumnValueSelector selector = getColumnValueSelectorFromDimensionSpec(
final ColumnValueSelector<?> selector = getColumnValueSelectorFromDimensionSpec(
dimSpec,
columnSelectorFactory
);
ColumnSelectorStrategyClass strategy = makeStrategy(
Strategy strategy = makeStrategy(
strategyFactory,
dimSpec,
columnSelectorFactory.getColumnCapabilities(dimSpec.getDimension()),
selector
);
final ColumnSelectorPlus<ColumnSelectorStrategyClass> selectorPlus = new ColumnSelectorPlus<>(
final ColumnSelectorPlus<Strategy> selectorPlus = new ColumnSelectorPlus<>(
dimName,
dimSpec.getOutputName(),
strategy,
@ -186,7 +183,7 @@ public final class DimensionHandlerUtils
return dims;
}
private static ColumnValueSelector getColumnValueSelectorFromDimensionSpec(
private static ColumnValueSelector<?> getColumnValueSelectorFromDimensionSpec(
DimensionSpec dimSpec,
ColumnSelectorFactory columnSelectorFactory
)
@ -194,12 +191,10 @@ public final class DimensionHandlerUtils
String dimName = dimSpec.getDimension();
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(dimName);
capabilities = getEffectiveCapabilities(dimSpec, capabilities);
switch (capabilities.getType()) {
case STRING:
return columnSelectorFactory.makeDimensionSelector(dimSpec);
default:
return columnSelectorFactory.makeColumnValueSelector(dimSpec.getDimension());
if (capabilities.getType() == ValueType.STRING) {
return columnSelectorFactory.makeDimensionSelector(dimSpec);
}
return columnSelectorFactory.makeColumnValueSelector(dimSpec.getDimension());
}
/**
@ -238,11 +233,11 @@ public final class DimensionHandlerUtils
return capabilities;
}
private static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy> ColumnSelectorStrategyClass makeStrategy(
ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
private static <Strategy extends ColumnSelectorStrategy> Strategy makeStrategy(
ColumnSelectorStrategyFactory<Strategy> strategyFactory,
DimensionSpec dimSpec,
@Nullable ColumnCapabilities capabilities,
ColumnValueSelector selector
ColumnValueSelector<?> selector
)
{
capabilities = getEffectiveCapabilities(dimSpec, capabilities);

View File

@ -188,6 +188,7 @@ public class VirtualColumns implements Cacheable
}
}
@Nullable
public DimensionSelector makeDimensionSelector(
DimensionSpec dimensionSpec,
ColumnSelector columnSelector,
@ -202,6 +203,7 @@ public class VirtualColumns implements Cacheable
}
}
@Nullable
public ColumnValueSelector<?> makeColumnValueSelector(
String columnName,
ColumnSelector columnSelector,
@ -253,6 +255,7 @@ public class VirtualColumns implements Cacheable
}
}
@Nullable
public ColumnCapabilities getColumnCapabilitiesWithFallback(StorageAdapter adapter, String columnName)
{
final ColumnCapabilities virtualColumnCapabilities = getColumnCapabilities(columnName);

View File

@ -100,6 +100,7 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -5832,9 +5833,9 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
.put("index_alias", 147L)
.put("longNumericNull", 10L)
.build(),
makeNumericNullRowHelper("index_alias", 114L, "longNumericNull", NullHandling.defaultLongValue()),
makeNumericNullRowHelper("index_alias", 126L, "longNumericNull", NullHandling.defaultLongValue()),
makeNumericNullRowHelper("index_alias", 166L, "longNumericNull", NullHandling.defaultLongValue())
makeRowWithNulls("index_alias", 114L, "longNumericNull", NullHandling.defaultLongValue()),
makeRowWithNulls("index_alias", 126L, "longNumericNull", NullHandling.defaultLongValue()),
makeRowWithNulls("index_alias", 166L, "longNumericNull", NullHandling.defaultLongValue())
)
)
)
@ -5900,9 +5901,9 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
.put("index_alias", 147L)
.put("floatNumericNull", 10f)
.build(),
makeNumericNullRowHelper("index_alias", 114L, "floatNumericNull", NullHandling.defaultFloatValue()),
makeNumericNullRowHelper("index_alias", 126L, "floatNumericNull", NullHandling.defaultFloatValue()),
makeNumericNullRowHelper("index_alias", 166L, "floatNumericNull", NullHandling.defaultFloatValue())
makeRowWithNulls("index_alias", 114L, "floatNumericNull", NullHandling.defaultFloatValue()),
makeRowWithNulls("index_alias", 126L, "floatNumericNull", NullHandling.defaultFloatValue()),
makeRowWithNulls("index_alias", 166L, "floatNumericNull", NullHandling.defaultFloatValue())
)
)
)
@ -5968,9 +5969,9 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
.put("index_alias", 147L)
.put("doubleNumericNull", 10d)
.build(),
makeNumericNullRowHelper("index_alias", 114L, "doubleNumericNull", NullHandling.defaultDoubleValue()),
makeNumericNullRowHelper("index_alias", 126L, "doubleNumericNull", NullHandling.defaultDoubleValue()),
makeNumericNullRowHelper("index_alias", 166L, "doubleNumericNull", NullHandling.defaultDoubleValue())
makeRowWithNulls("index_alias", 114L, "doubleNumericNull", NullHandling.defaultDoubleValue()),
makeRowWithNulls("index_alias", 126L, "doubleNumericNull", NullHandling.defaultDoubleValue()),
makeRowWithNulls("index_alias", 166L, "doubleNumericNull", NullHandling.defaultDoubleValue())
)
)
)
@ -5978,16 +5979,113 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
assertExpectedResults(expectedResults, query);
}
private static Map<String, Object> makeNumericNullRowHelper(
@Test
public void testAggregateOnLongNumericNull()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(QueryRunnerTestHelper.ALL_GRAN)
.dimension(new DefaultDimensionSpec("longNumericNull", "dim", ValueType.LONG))
.metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
.threshold(10000)
.aggregators(new CountAggregatorFactory("count"))
.intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
.build();
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.asList(
makeRowWithNulls("dim", NullHandling.defaultLongValue(), "count", 279L),
makeRowWithNulls("dim", 10L, "count", 93L),
makeRowWithNulls("dim", 20L, "count", 93L),
makeRowWithNulls("dim", 40L, "count", 93L),
makeRowWithNulls("dim", 50L, "count", 279L),
makeRowWithNulls("dim", 70L, "count", 279L),
makeRowWithNulls("dim", 80L, "count", 93L)
)
)
)
);
assertExpectedResults(expectedResults, query);
}
@Test
public void testAggregateOnDoubleNumericNull()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(QueryRunnerTestHelper.ALL_GRAN)
.dimension(new DefaultDimensionSpec("doubleNumericNull", "dim", ValueType.DOUBLE))
.metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
.threshold(10000)
.aggregators(new CountAggregatorFactory("count"))
.intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
.build();
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.asList(
makeRowWithNulls("dim", NullHandling.defaultDoubleValue(), "count", 279L),
makeRowWithNulls("dim", 10.0, "count", 93L),
makeRowWithNulls("dim", 20.0, "count", 93L),
makeRowWithNulls("dim", 40.0, "count", 93L),
makeRowWithNulls("dim", 50.0, "count", 279L),
makeRowWithNulls("dim", 70.0, "count", 279L),
makeRowWithNulls("dim", 80.0, "count", 93L)
)
)
)
);
assertExpectedResults(expectedResults, query);
}
@Test
public void testAggregateOnFloatNumericNull()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(QueryRunnerTestHelper.ALL_GRAN)
.dimension(new DefaultDimensionSpec("floatNumericNull", "dim", ValueType.FLOAT))
.metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
.threshold(10000)
.aggregators(new CountAggregatorFactory("count"))
.intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
.build();
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.asList(
makeRowWithNulls("dim", NullHandling.defaultFloatValue(), "count", 279L),
makeRowWithNulls("dim", 10.0f, "count", 93L),
makeRowWithNulls("dim", 20.0f, "count", 93L),
makeRowWithNulls("dim", 40.0f, "count", 93L),
makeRowWithNulls("dim", 50.0f, "count", 279L),
makeRowWithNulls("dim", 70.0f, "count", 279L),
makeRowWithNulls("dim", 80.0f, "count", 93L)
)
)
)
);
assertExpectedResults(expectedResults, query);
}
private static Map<String, Object> makeRowWithNulls(
String dimName,
Object dimValue,
String nameOfColumnWithNull,
Object defaultNullValue
@Nullable Object dimValue,
String metric,
@Nullable Object metricVal
)
{
Map<String, Object> nullRow = new HashMap<>();
nullRow.put(dimName, dimValue);
nullRow.put(nameOfColumnWithNull, defaultNullValue);
nullRow.put(metric, metricVal);
return nullRow;
}
}

View File

@ -2683,6 +2683,126 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testNullDoubleTopN() throws Exception
{
List<Object[]> expected;
if (useDefault) {
expected = ImmutableList.of(
new Object[]{1.7, 1L},
new Object[]{1.0, 1L},
new Object[]{0.0, 4L}
);
} else {
expected = ImmutableList.of(
new Object[]{null, 3L},
new Object[]{1.7, 1L},
new Object[]{1.0, 1L},
new Object[]{0.0, 1L}
);
}
testQuery(
"SELECT d1, COUNT(*) FROM druid.numfoo GROUP BY d1 ORDER BY d1 DESC LIMIT 10",
QUERY_CONTEXT_DEFAULT,
ImmutableList.of(
new TopNQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(querySegmentSpec(Filtration.eternity()))
.dimension(new DefaultDimensionSpec("d1", "_d0", ValueType.DOUBLE))
.threshold(10)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.metric(
new InvertedTopNMetricSpec(
new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)
)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
expected
);
}
@Test
public void testNullFloatTopN() throws Exception
{
List<Object[]> expected;
if (useDefault) {
expected = ImmutableList.of(
new Object[]{1.0f, 1L},
new Object[]{0.1f, 1L},
new Object[]{0.0f, 4L}
);
} else {
expected = ImmutableList.of(
new Object[]{null, 3L},
new Object[]{1.0f, 1L},
new Object[]{0.1f, 1L},
new Object[]{0.0f, 1L}
);
}
testQuery(
"SELECT f1, COUNT(*) FROM druid.numfoo GROUP BY f1 ORDER BY f1 DESC LIMIT 10",
QUERY_CONTEXT_DEFAULT,
ImmutableList.of(
new TopNQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(querySegmentSpec(Filtration.eternity()))
.dimension(new DefaultDimensionSpec("f1", "_d0", ValueType.FLOAT))
.threshold(10)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.metric(
new InvertedTopNMetricSpec(
new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)
)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
expected
);
}
@Test
public void testNullLongTopN() throws Exception
{
List<Object[]> expected;
if (useDefault) {
expected = ImmutableList.of(
new Object[]{325323L, 1L},
new Object[]{7L, 1L},
new Object[]{0L, 4L}
);
} else {
expected = ImmutableList.of(
new Object[]{null, 3L},
new Object[]{325323L, 1L},
new Object[]{7L, 1L},
new Object[]{0L, 1L}
);
}
testQuery(
"SELECT l1, COUNT(*) FROM druid.numfoo GROUP BY l1 ORDER BY l1 DESC LIMIT 10",
QUERY_CONTEXT_DEFAULT,
ImmutableList.of(
new TopNQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(querySegmentSpec(Filtration.eternity()))
.dimension(new DefaultDimensionSpec("l1", "_d0", ValueType.LONG))
.threshold(10)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.metric(
new InvertedTopNMetricSpec(
new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)
)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
expected
);
}
@Test
public void testEmptyStringEquality() throws Exception
{