diff --git a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java index bb49c357f3f..3a1fdbdadfd 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java @@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy; import org.apache.druid.segment.Capabilities; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionSelector; @@ -79,7 +78,7 @@ public abstract class BaseTopNAlgorithm, TopNParams> +public class HeapBasedTopNAlgorithm + extends BaseTopNAlgorithm { private final TopNQuery query; - public DimExtractionTopNAlgorithm( + public HeapBasedTopNAlgorithm( StorageAdapter storageAdapter, TopNQuery query ) @@ -47,7 +48,7 @@ public class DimExtractionTopNAlgorithm @Override public TopNParams makeInitParams( - final ColumnSelectorPlus selectorPlus, + final ColumnSelectorPlus selectorPlus, final Cursor cursor ) { @@ -64,8 +65,8 @@ public class DimExtractionTopNAlgorithm if (params.getCardinality() < 0) { throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality"); } - ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); - return selectorPlus.getColumnSelectorStrategy().getDimExtractionRowSelector(query, params, storageAdapter); + ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); + return selectorPlus.getColumnSelectorStrategy().getRowSelector(query, params, storageAdapter); } @Override @@ -75,54 +76,46 @@ public class DimExtractionTopNAlgorithm } @Override - protected Map makeDimValAggregateStore(TopNParams params) + protected TopNColumnAggregatesProcessor makeDimValAggregateStore(TopNParams params) { - final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); - return selectorPlus.getColumnSelectorStrategy().makeDimExtractionAggregateStore(); + final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); + return selectorPlus.getColumnSelectorStrategy(); } @Override - public long scanAndAggregate( + protected long scanAndAggregate( TopNParams params, Aggregator[][] rowSelector, - Map aggregatesStore + TopNColumnAggregatesProcessor processor ) { final Cursor cursor = params.getCursor(); - final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); + final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); - return selectorPlus.getColumnSelectorStrategy().dimExtractionScanAndAggregate( + processor.initAggregateStore(); + return processor.scanAndAggregate( query, selectorPlus.getSelector(), cursor, - rowSelector, - aggregatesStore + rowSelector ); } @Override protected void updateResults( TopNParams params, - Aggregator[][] rowSelector, - Map aggregatesStore, + Aggregator[][] aggregators, + TopNColumnAggregatesProcessor processor, TopNResultBuilder resultBuilder ) { - final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); - selectorPlus.getColumnSelectorStrategy().updateDimExtractionResults( - aggregatesStore, - resultBuilder - ); + processor.updateResults(resultBuilder); } @Override - protected void closeAggregators(Map valueMap) + protected void closeAggregators(TopNColumnAggregatesProcessor processor) { - for (Aggregator[] aggregators : valueMap.values()) { - for (Aggregator agg : aggregators) { - agg.close(); - } - } + processor.closeAggregators(); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java index 7d4c23cee23..a23518a4aa9 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java @@ -27,10 +27,11 @@ import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.column.ValueType; +import java.util.HashMap; import java.util.Map; import java.util.function.Function; -public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm, TopNParams> +public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm, Aggregator[]>, TopNParams> { private static final int[] EMPTY_INTS = new int[]{}; @@ -74,17 +75,16 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm makeDimValAggregateStore(TopNParams params) + protected Map, Aggregator[]> makeDimValAggregateStore(TopNParams params) { - return params.getSelectorPlus().getColumnSelectorStrategy().makeDimExtractionAggregateStore(); + return new HashMap<>(); } @Override protected long scanAndAggregate( TopNParams params, int[] dimValSelector, - Map aggregatesStore + Map, Aggregator[]> aggregatesStore ) { if (params.getCardinality() < 0) { @@ -96,7 +96,7 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0))); Aggregator[] theAggregators = aggregatesStore.get(key); if (theAggregators == null) { @@ -118,11 +118,11 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm aggregatesStore, + Map, Aggregator[]> aggregatesStore, TopNResultBuilder resultBuilder ) { - for (Map.Entry entry : aggregatesStore.entrySet()) { + for (Map.Entry, Aggregator[]> entry : aggregatesStore.entrySet()) { Aggregator[] aggs = entry.getValue(); if (aggs != null) { Object[] vals = new Object[aggs.length]; @@ -140,7 +140,7 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm stringMap) + protected void closeAggregators(Map, Aggregator[]> stringMap) { for (Aggregator[] aggregators : stringMap.values()) { for (Aggregator agg : aggregators) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java index 0f7a52707a0..31b4d9204e8 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java @@ -21,7 +21,7 @@ package org.apache.druid.query.topn; import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy; +import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor; import org.apache.druid.segment.Cursor; import javax.annotation.Nullable; @@ -34,7 +34,7 @@ public interface TopNAlgorithm int INIT_POSITION_VALUE = -1; int SKIP_POSITION_VALUE = -2; - TopNParams makeInitParams(ColumnSelectorPlus selectorPlus, Cursor cursor); + TopNParams makeInitParams(ColumnSelectorPlus selectorPlus, Cursor cursor); void run( Parameters params, diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java b/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java index af12047005a..96fb62f9012 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java @@ -21,7 +21,8 @@ package org.apache.druid.query.topn; import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.Result; -import org.apache.druid.query.topn.types.TopNColumnSelectorStrategyFactory; +import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor; +import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionHandlerUtils; @@ -42,13 +43,15 @@ public class TopNMapFn } @SuppressWarnings("unchecked") + @Nullable public Result apply(final Cursor cursor, final @Nullable TopNQueryMetrics queryMetrics) { - final ColumnSelectorPlus selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus( - new TopNColumnSelectorStrategyFactory(query.getDimensionSpec().getOutputType()), - query.getDimensionSpec(), - cursor.getColumnSelectorFactory() - ); + final ColumnSelectorPlus> selectorPlus = + DimensionHandlerUtils.createColumnSelectorPlus( + new TopNColumnAggregatesProcessorFactory(query.getDimensionSpec().getOutputType()), + query.getDimensionSpec(), + cursor.getColumnSelectorFactory() + ); if (selectorPlus.getSelector() == null) { return null; diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java b/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java index a80e1bd1f73..cdc541f9924 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java @@ -20,7 +20,7 @@ package org.apache.druid.query.topn; import org.apache.druid.query.ColumnSelectorPlus; -import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy; +import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionSelector; @@ -28,13 +28,14 @@ import org.apache.druid.segment.DimensionSelector; */ public class TopNParams { + public static final int CARDINALITY_UNKNOWN = -1; private final Cursor cursor; private final int cardinality; private final int numValuesPerPass; - private final ColumnSelectorPlus selectorPlus; + private final ColumnSelectorPlus selectorPlus; protected TopNParams( - ColumnSelectorPlus selectorPlus, + ColumnSelectorPlus selectorPlus, Cursor cursor, int numValuesPerPass ) @@ -52,7 +53,7 @@ public class TopNParams return (DimensionSelector) selectorPlus.getSelector(); } - public ColumnSelectorPlus getSelectorPlus() + public ColumnSelectorPlus getSelectorPlus() { return selectorPlus; } diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index 0b2fb487354..dfe180971ba 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -19,7 +19,6 @@ package org.apache.druid.query.topn; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import org.apache.druid.collections.NonBlockingPool; @@ -30,7 +29,6 @@ import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.filter.Filter; -import org.apache.druid.segment.Cursor; import org.apache.druid.segment.SegmentMissingException; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.column.ColumnCapabilities; @@ -86,16 +84,11 @@ public class TopNQueryEngine query.isDescending(), queryMetrics ), - new Function>() - { - @Override - public Result apply(Cursor input) - { - if (queryMetrics != null) { - queryMetrics.cursor(input); - } - return mapFn.apply(input, queryMetrics); + input -> { + if (queryMetrics != null) { + queryMetrics.cursor(input); } + return mapFn.apply(input, queryMetrics); } ), Predicates.notNull() @@ -125,7 +118,8 @@ public class TopNQueryEngine final ColumnCapabilities columnCapabilities = query.getVirtualColumns() .getColumnCapabilitiesWithFallback(adapter, dimension); - final TopNAlgorithm topNAlgorithm; + + final TopNAlgorithm topNAlgorithm; if ( selector.isHasExtractionFn() && // TimeExtractionTopNAlgorithm can work on any single-value dimension of type long. @@ -137,20 +131,23 @@ public class TopNQueryEngine // currently relies on the dimension cardinality to support lexicographic sorting topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query); } else if (selector.isHasExtractionFn()) { - topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query); + topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); } else if (columnCapabilities != null && !(columnCapabilities.getType() == ValueType.STRING && columnCapabilities.isDictionaryEncoded())) { - // Use DimExtraction for non-Strings and for non-dictionary-encoded Strings. - topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query); + // Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings. + topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); } else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) { - // Use DimExtraction when the dimension output type is a non-String. (It's like an extractionFn: there can be + // Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be // a many-to-one mapping, since numeric types can't represent all possible values of other types.) - topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query); + topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); } else if (selector.isAggregateAllMetrics()) { + // sorted by dimension topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool); } else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) { + // high cardinality dimensions with larger result sets topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool); } else { + // anything else topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool); } if (queryMetrics != null) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java new file mode 100644 index 00000000000..2a2781d45b9 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.types; + +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.topn.BaseTopNAlgorithm; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.segment.Cursor; + +import java.util.Map; +import java.util.function.Function; + +public class DoubleTopNColumnAggregatesProcessor + extends NullableNumericTopNColumnAggregatesProcessor +{ + private Long2ObjectMap aggregatesStore; + + protected DoubleTopNColumnAggregatesProcessor(Function> converter) + { + super(converter); + } + + @Override + Aggregator[] getValueAggregators( + TopNQuery query, + BaseDoubleColumnValueSelector selector, + Cursor cursor + ) + { + long key = Double.doubleToLongBits(selector.getDouble()); + return aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); + } + + @Override + public void initAggregateStore() + { + nullValueAggregates = null; + aggregatesStore = new Long2ObjectOpenHashMap<>(); + } + + @Override + Map getAggregatesStore() + { + return aggregatesStore; + } + + @Override + Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey) + { + return converter.apply(Double.longBitsToDouble((Long) aggregatorStoreKey)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java new file mode 100644 index 00000000000..4e8dd8839d5 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.types; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.topn.BaseTopNAlgorithm; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.segment.Cursor; + +import java.util.Map; +import java.util.function.Function; + +public class FloatTopNColumnAggregatesProcessor + extends NullableNumericTopNColumnAggregatesProcessor +{ + private Int2ObjectMap aggregatesStore; + + protected FloatTopNColumnAggregatesProcessor(Function> converter) + { + super(converter); + } + + @Override + Aggregator[] getValueAggregators( + TopNQuery query, + BaseFloatColumnValueSelector selector, + Cursor cursor + ) + { + int key = Float.floatToIntBits(selector.getFloat()); + return aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); + } + + @Override + public void initAggregateStore() + { + nullValueAggregates = null; + this.aggregatesStore = new Int2ObjectOpenHashMap<>(); + } + + @Override + Map getAggregatesStore() + { + return aggregatesStore; + } + + @Override + Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey) + { + return converter.apply(Float.intBitsToFloat((Integer) aggregatorStoreKey)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java new file mode 100644 index 00000000000..d28d8a81397 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.types; + +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.topn.BaseTopNAlgorithm; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.Cursor; + +import java.util.Map; +import java.util.function.Function; + +public class LongTopNColumnAggregatesProcessor + extends NullableNumericTopNColumnAggregatesProcessor +{ + private Long2ObjectMap aggregatesStore; + + public LongTopNColumnAggregatesProcessor(Function> converter) + { + super(converter); + } + + @Override + Aggregator[] getValueAggregators(TopNQuery query, BaseLongColumnValueSelector selector, Cursor cursor) + { + long key = selector.getLong(); + return aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); + } + + @Override + public void initAggregateStore() + { + nullValueAggregates = null; + aggregatesStore = new Long2ObjectOpenHashMap<>(); + } + + @Override + Map getAggregatesStore() + { + return aggregatesStore; + } + + @Override + Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey) + { + return converter.apply(aggregatorStoreKey); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java new file mode 100644 index 00000000000..0b8e90c1659 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.types; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.topn.BaseTopNAlgorithm; +import org.apache.druid.query.topn.TopNParams; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.query.topn.TopNResultBuilder; +import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.StorageAdapter; + +import java.util.Map; +import java.util.function.Function; + +/** + * Base {@link TopNColumnAggregatesProcessor} for {@link BaseNullableColumnValueSelector}. Non-null selector values + * aggregates are stored in a type appropriate primitive map, created by {@link #initAggregateStore()} and available + * via {@link #getAggregatesStore()}, and null valued row aggregates are stored in a separate + * {@link #nullValueAggregates} {@link Aggregator} array. + * + * {@link #updateResults} will combine both the map and null aggregates to populate the {@link TopNResultBuilder} with + * the values produced by {@link #scanAndAggregate}. + */ +public abstract class NullableNumericTopNColumnAggregatesProcessor + implements TopNColumnAggregatesProcessor +{ + private final boolean hasNulls = !NullHandling.replaceWithDefault(); + final Function> converter; + Aggregator[] nullValueAggregates; + + protected NullableNumericTopNColumnAggregatesProcessor(Function> converter) + { + this.converter = converter; + } + + /** + * Get {@link Aggregator} set for the current {@param Selector} row value for a given {@link Cursor} + */ + abstract Aggregator[] getValueAggregators(TopNQuery query, Selector selector, Cursor cursor); + + /** + * Get primitive numeric map for value aggregates created by {@link #scanAndAggregate}, to be used by + * {@link #updateResults} to apply to the {@link TopNResultBuilder} + */ + abstract Map getAggregatesStore(); + + /** + * Method to convert primitive numeric value keys used by {@link #getAggregatesStore} into the correct representation + * for the {@link TopNResultBuilder}, called by {@link #updateResults} + */ + abstract Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey); + + @Override + public int getCardinality(Selector selector) + { + return TopNParams.CARDINALITY_UNKNOWN; + } + + @Override + public Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter) + { + return null; + } + + @Override + public long scanAndAggregate( + TopNQuery query, + Selector selector, + Cursor cursor, + Aggregator[][] rowSelector + ) + { + long processedRows = 0; + while (!cursor.isDone()) { + if (hasNulls && selector.isNull()) { + if (nullValueAggregates == null) { + nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); + } + for (Aggregator aggregator : nullValueAggregates) { + aggregator.aggregate(); + } + } else { + Aggregator[] valueAggregates = getValueAggregators(query, selector, cursor); + for (Aggregator aggregator : valueAggregates) { + aggregator.aggregate(); + } + } + cursor.advance(); + processedRows++; + } + return processedRows; + } + + + @Override + public void updateResults(TopNResultBuilder resultBuilder) + { + for (Map.Entry entry : getAggregatesStore().entrySet()) { + Aggregator[] aggs = entry.getValue(); + if (aggs != null) { + Object[] vals = new Object[aggs.length]; + for (int i = 0; i < aggs.length; i++) { + vals[i] = aggs[i].get(); + } + + final Comparable key = convertAggregatorStoreKeyToColumnValue(entry.getKey()); + resultBuilder.addEntry(key, key, vals); + } + } + + if (nullValueAggregates != null) { + Object[] nullVals = new Object[nullValueAggregates.length]; + for (int i = 0; i < nullValueAggregates.length; i++) { + nullVals[i] = nullValueAggregates[i].get(); + } + + resultBuilder.addEntry(null, null, nullVals); + } + } + + @Override + public void closeAggregators() + { + for (Aggregator[] aggregators : getAggregatesStore().values()) { + for (Aggregator agg : aggregators) { + agg.close(); + } + } + + if (nullValueAggregates != null) { + for (Aggregator nullAgg : nullValueAggregates) { + nullAgg.close(); + } + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java deleted file mode 100644 index e27baf4e202..00000000000 --- a/processing/src/main/java/org/apache/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.topn.types; - -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import it.unimi.dsi.fastutil.longs.Long2ObjectMap; -import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; -import org.apache.druid.java.util.common.IAE; -import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.topn.BaseTopNAlgorithm; -import org.apache.druid.query.topn.TopNParams; -import org.apache.druid.query.topn.TopNQuery; -import org.apache.druid.query.topn.TopNResultBuilder; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; -import org.apache.druid.segment.BaseFloatColumnValueSelector; -import org.apache.druid.segment.BaseLongColumnValueSelector; -import org.apache.druid.segment.Cursor; -import org.apache.druid.segment.DimensionHandlerUtils; -import org.apache.druid.segment.StorageAdapter; -import org.apache.druid.segment.column.ValueType; - -import java.util.Map; -import java.util.function.Function; - -public abstract class NumericTopNColumnSelectorStrategy< - ValueSelectorType, - DimExtractionAggregateStoreType extends Map> - implements TopNColumnSelectorStrategy -{ - public static TopNColumnSelectorStrategy ofType(final ValueType selectorType, final ValueType dimensionType) - { - final Function> converter = DimensionHandlerUtils.converterFromTypeToType( - selectorType, - dimensionType - ); - - switch (selectorType) { - case LONG: - return new OfLong(converter); - case FLOAT: - return new OfFloat(converter); - case DOUBLE: - return new OfDouble(converter); - default: - throw new IAE("No strategy for type[%s]", selectorType); - } - } - - @Override - public int getCardinality(ValueSelectorType selector) - { - return TopNColumnSelectorStrategy.CARDINALITY_UNKNOWN; - } - - @Override - public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter) - { - return null; - } - - static long floatDimExtractionScanAndAggregate( - TopNQuery query, - BaseFloatColumnValueSelector selector, - Cursor cursor, - Int2ObjectMap aggregatesStore - ) - { - long processedRows = 0; - while (!cursor.isDone()) { - int key = Float.floatToIntBits(selector.getFloat()); - Aggregator[] theAggregators = aggregatesStore.get(key); - if (theAggregators == null) { - theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, theAggregators); - } - for (Aggregator aggregator : theAggregators) { - aggregator.aggregate(); - } - cursor.advance(); - processedRows++; - } - return processedRows; - } - - static long doubleDimExtractionScanAndAggregate( - TopNQuery query, - BaseDoubleColumnValueSelector selector, - Cursor cursor, - Long2ObjectMap aggregatesStore - ) - { - long processedRows = 0; - while (!cursor.isDone()) { - long key = Double.doubleToLongBits(selector.getDouble()); - Aggregator[] theAggregators = aggregatesStore.get(key); - if (theAggregators == null) { - theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, theAggregators); - } - for (Aggregator aggregator : theAggregators) { - aggregator.aggregate(); - } - cursor.advance(); - processedRows++; - } - return processedRows; - } - - static long longDimExtractionScanAndAggregate( - TopNQuery query, - BaseLongColumnValueSelector selector, - Cursor cursor, - Long2ObjectMap aggregatesStore - ) - { - long processedRows = 0; - while (!cursor.isDone()) { - long key = selector.getLong(); - Aggregator[] theAggregators = aggregatesStore.get(key); - if (theAggregators == null) { - theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, theAggregators); - } - for (Aggregator aggregator : theAggregators) { - aggregator.aggregate(); - } - cursor.advance(); - processedRows++; - } - return processedRows; - } - - @Override - public void updateDimExtractionResults( - final DimExtractionAggregateStoreType aggregatesStore, - final TopNResultBuilder resultBuilder - ) - { - for (Map.Entry entry : aggregatesStore.entrySet()) { - Aggregator[] aggs = entry.getValue(); - if (aggs != null) { - Object[] vals = new Object[aggs.length]; - for (int i = 0; i < aggs.length; i++) { - vals[i] = aggs[i].get(); - } - - final Comparable key = convertAggregatorStoreKeyToColumnValue(entry.getKey()); - resultBuilder.addEntry(key, key, vals); - } - } - } - - abstract Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey); - - static class OfFloat - extends NumericTopNColumnSelectorStrategy> - { - private final Function> converter; - - OfFloat(final Function> converter) - { - this.converter = converter; - } - - @Override - public Int2ObjectMap makeDimExtractionAggregateStore() - { - return new Int2ObjectOpenHashMap<>(); - } - - @Override - Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey) - { - return converter.apply(Float.intBitsToFloat((Integer) aggregatorStoreKey)); - } - - @Override - public long dimExtractionScanAndAggregate( - TopNQuery query, - BaseFloatColumnValueSelector selector, - Cursor cursor, - Aggregator[][] rowSelector, - Int2ObjectMap aggregatesStore - ) - { - return floatDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore); - } - } - - static class OfLong - extends NumericTopNColumnSelectorStrategy> - { - private final Function> converter; - - OfLong(final Function> converter) - { - this.converter = converter; - } - - @Override - public Long2ObjectMap makeDimExtractionAggregateStore() - { - return new Long2ObjectOpenHashMap<>(); - } - - @Override - Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey) - { - return converter.apply(aggregatorStoreKey); - } - - @Override - public long dimExtractionScanAndAggregate( - TopNQuery query, - BaseLongColumnValueSelector selector, - Cursor cursor, - Aggregator[][] rowSelector, - Long2ObjectMap aggregatesStore - ) - { - return longDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore); - } - } - - static class OfDouble - extends NumericTopNColumnSelectorStrategy> - { - private final Function> converter; - - OfDouble(final Function> converter) - { - this.converter = converter; - } - - @Override - public Long2ObjectMap makeDimExtractionAggregateStore() - { - return new Long2ObjectOpenHashMap<>(); - } - - @Override - Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey) - { - return converter.apply(Double.longBitsToDouble((Long) aggregatorStoreKey)); - } - - @Override - public long dimExtractionScanAndAggregate( - TopNQuery query, - BaseDoubleColumnValueSelector selector, - Cursor cursor, - Aggregator[][] rowSelector, - Long2ObjectMap aggregatesStore - ) - { - return doubleDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore); - } - } -} diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java similarity index 66% rename from processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnSelectorStrategy.java rename to processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java index f5e838d42b6..cea7c775a75 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java @@ -36,12 +36,12 @@ import java.util.HashMap; import java.util.Map; import java.util.function.Function; -public class StringTopNColumnSelectorStrategy - implements TopNColumnSelectorStrategy, Aggregator[]>> +public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregatesProcessor { private final Function> dimensionValueConverter; + private HashMap, Aggregator[]> aggregatesStore; - public StringTopNColumnSelectorStrategy(final ValueType dimensionType) + public StringTopNColumnAggregatesProcessor(final ValueType dimensionType) { this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToType(ValueType.STRING, dimensionType); } @@ -53,7 +53,7 @@ public class StringTopNColumnSelectorStrategy } @Override - public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter) + public Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter) { if (params.getCardinality() < 0) { throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality"); @@ -74,34 +74,9 @@ public class StringTopNColumnSelectorStrategy } @Override - public Map, Aggregator[]> makeDimExtractionAggregateStore() + public void updateResults(TopNResultBuilder resultBuilder) { - return new HashMap<>(); - } - - @Override - public long dimExtractionScanAndAggregate( - TopNQuery query, - DimensionSelector selector, - Cursor cursor, - Aggregator[][] rowSelector, - Map, Aggregator[]> aggregatesStore - ) - { - if (selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN) { - return dimExtractionScanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector, aggregatesStore); - } else { - return dimExtractionScanAndAggregateWithCardinalityUnknown(query, cursor, selector, aggregatesStore); - } - } - - @Override - public void updateDimExtractionResults( - final Map, Aggregator[]> aggregatesStore, - final TopNResultBuilder resultBuilder - ) - { - for (Map.Entry, Aggregator[]> entry : aggregatesStore.entrySet()) { + for (Map.Entry entry : aggregatesStore.entrySet()) { Aggregator[] aggs = entry.getValue(); if (aggs != null) { Object[] vals = new Object[aggs.length]; @@ -115,12 +90,42 @@ public class StringTopNColumnSelectorStrategy } } - private long dimExtractionScanAndAggregateWithCardinalityKnown( + @Override + public void closeAggregators() + { + for (Aggregator[] aggregators : aggregatesStore.values()) { + for (Aggregator agg : aggregators) { + agg.close(); + } + } + } + + @Override + public long scanAndAggregate( + TopNQuery query, + DimensionSelector selector, + Cursor cursor, + Aggregator[][] rowSelector + ) + { + if (selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN) { + return scanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector); + } else { + return scanAndAggregateWithCardinalityUnknown(query, cursor, selector); + } + } + + @Override + public void initAggregateStore() + { + this.aggregatesStore = new HashMap<>(); + } + + private long scanAndAggregateWithCardinalityKnown( TopNQuery query, Cursor cursor, DimensionSelector selector, - Aggregator[][] rowSelector, - Map, Aggregator[]> aggregatesStore + Aggregator[][] rowSelector ) { long processedRows = 0; @@ -128,18 +133,17 @@ public class StringTopNColumnSelectorStrategy final IndexedInts dimValues = selector.getRow(); for (int i = 0, size = dimValues.size(); i < size; ++i) { final int dimIndex = dimValues.get(i); - Aggregator[] theAggregators = rowSelector[dimIndex]; - if (theAggregators == null) { + Aggregator[] aggs = rowSelector[dimIndex]; + if (aggs == null) { final Comparable key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); - theAggregators = aggregatesStore.get(key); - if (theAggregators == null) { - theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, theAggregators); - } - rowSelector[dimIndex] = theAggregators; + aggs = aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); + rowSelector[dimIndex] = aggs; } - for (Aggregator aggregator : theAggregators) { + for (Aggregator aggregator : aggs) { aggregator.aggregate(); } } @@ -149,11 +153,10 @@ public class StringTopNColumnSelectorStrategy return processedRows; } - private long dimExtractionScanAndAggregateWithCardinalityUnknown( + private long scanAndAggregateWithCardinalityUnknown( TopNQuery query, Cursor cursor, - DimensionSelector selector, - Map, Aggregator[]> aggregatesStore + DimensionSelector selector ) { long processedRows = 0; @@ -162,13 +165,11 @@ public class StringTopNColumnSelectorStrategy for (int i = 0, size = dimValues.size(); i < size; ++i) { final int dimIndex = dimValues.get(i); final Comparable key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); - - Aggregator[] theAggregators = aggregatesStore.get(key); - if (theAggregators == null) { - theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, theAggregators); - } - for (Aggregator aggregator : theAggregators) { + Aggregator[] aggs = aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); + for (Aggregator aggregator : aggs) { aggregator.aggregate(); } } diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java similarity index 51% rename from processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategy.java rename to processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java index 464189d487d..ac5b21ff123 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java @@ -21,28 +21,39 @@ package org.apache.druid.query.topn.types; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.dimension.ColumnSelectorStrategy; +import org.apache.druid.query.topn.HeapBasedTopNAlgorithm; import org.apache.druid.query.topn.TopNParams; import org.apache.druid.query.topn.TopNQuery; import org.apache.druid.query.topn.TopNResultBuilder; +import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.StorageAdapter; -import java.util.Map; +import javax.annotation.Nullable; -public interface TopNColumnSelectorStrategy - extends ColumnSelectorStrategy +/** + * This {@link ColumnSelectorStrategy} is used by all {@link org.apache.druid.query.topn.TopNAlgorithm} to provide + * selector value cardinality to {@link TopNParams} (perhaps unecessarily, but that is another matter), but is primarily + * used by {@link HeapBasedTopNAlgorithm} to serve as its value aggregates store. + * + * Given a query, column value selector, and cursor to process, the aggregates store is populated by calling + * {@link #scanAndAggregate} and can be applied to {@link TopNResultBuilder} through {@link #updateResults}. + */ +public interface TopNColumnAggregatesProcessor extends ColumnSelectorStrategy { - int CARDINALITY_UNKNOWN = -1; - + /** + * Get value cardinality of underlying {@link ColumnValueSelector} + */ int getCardinality(ValueSelectorType selector); /** - * Used by DimExtractionTopNAlgorithm. + * Used by {@link HeapBasedTopNAlgorithm}. * - * Create an Aggregator[][] using BaseTopNAlgorithm.AggregatorArrayProvider and the given parameters. + * Create an Aggregator[][] using {@link org.apache.druid.query.topn.BaseTopNAlgorithm.AggregatorArrayProvider} and + * the given parameters. * * As the Aggregator[][] is used as an integer-based lookup, this method is only applicable for dimension types - * that use integer row values. + * that use integer row values, e.g. string columns where the value cardinality is known. * * A dimension type that does not have integer values should return null. * @@ -53,30 +64,24 @@ public interface TopNColumnSelectorStrategy> +{ + private final ValueType dimensionType; + + public TopNColumnAggregatesProcessorFactory(final ValueType dimensionType) + { + this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType"); + } + + @Override + public TopNColumnAggregatesProcessor makeColumnSelectorStrategy( + ColumnCapabilities capabilities, + ColumnValueSelector selector + ) + { + final ValueType selectorType = capabilities.getType(); + + if (selectorType.equals(ValueType.STRING)) { + return new StringTopNColumnAggregatesProcessor(dimensionType); + } else if (selectorType.isNumeric()) { + final Function> converter; + final ValueType strategyType; + // When the selector is numeric, we want to use NumericTopNColumnSelectorStrategy. It aggregates using + // a numeric type and then converts to the desired output type after aggregating. We must be careful not to + // convert to an output type that cannot represent all possible values of the input type. + if (ValueType.isNumeric(dimensionType)) { + // Return strategy that aggregates using the _output_ type, because this allows us to collapse values + // properly (numeric types cannot always represent all values of other numeric types). + converter = DimensionHandlerUtils.converterFromTypeToType(dimensionType, dimensionType); + strategyType = dimensionType; + } else { + // Return strategy that aggregates using the _input_ type. Here we are assuming that the output type can + // represent all possible values of the input type. This will be true for STRING, which is the only + // non-numeric type currently supported. + converter = DimensionHandlerUtils.converterFromTypeToType(selectorType, dimensionType); + strategyType = selectorType; + } + switch (strategyType) { + case LONG: + return new LongTopNColumnAggregatesProcessor(converter); + case FLOAT: + return new FloatTopNColumnAggregatesProcessor(converter); + case DOUBLE: + return new DoubleTopNColumnAggregatesProcessor(converter); + } + } + + throw new IAE("Cannot create query type helper from invalid type [%s]", selectorType); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java deleted file mode 100644 index 8cc820d04ba..00000000000 --- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.topn.types; - -import com.google.common.base.Preconditions; -import org.apache.druid.java.util.common.IAE; -import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory; -import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.column.ColumnCapabilities; -import org.apache.druid.segment.column.ValueType; - -public class TopNColumnSelectorStrategyFactory implements ColumnSelectorStrategyFactory -{ - private final ValueType dimensionType; - - public TopNColumnSelectorStrategyFactory(final ValueType dimensionType) - { - this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType"); - } - - @Override - public TopNColumnSelectorStrategy makeColumnSelectorStrategy( - ColumnCapabilities capabilities, - ColumnValueSelector selector - ) - { - final ValueType selectorType = capabilities.getType(); - - switch (selectorType) { - case STRING: - // Return strategy that reads strings and outputs dimensionTypes. - return new StringTopNColumnSelectorStrategy(dimensionType); - case LONG: - case FLOAT: - case DOUBLE: - // When the selector is numeric, we want to use NumericTopNColumnSelectorStrategy. It aggregates using - // a numeric type and then converts to the desired output type after aggregating. We must be careful not to - // convert to an output type that cannot represent all possible values of the input type. - - if (ValueType.isNumeric(dimensionType)) { - // Return strategy that aggregates using the _output_ type, because this allows us to collapse values - // properly (numeric types cannot always represent all values of other numeric types). - return NumericTopNColumnSelectorStrategy.ofType(dimensionType, dimensionType); - } else { - // Return strategy that aggregates using the _input_ type. Here we are assuming that the output type can - // represent all possible values of the input type. This will be true for STRING, which is the only - // non-numeric type currently supported. - return NumericTopNColumnSelectorStrategy.ofType(selectorType, dimensionType); - } - default: - throw new IAE("Cannot create query type helper from invalid type [%s]", selectorType); - } - } -} diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java index 568d4c8f2de..0e4d46d36f4 100644 --- a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java +++ b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java @@ -63,9 +63,9 @@ public final class DimensionHandlerUtils .setDictionaryEncoded(true) .setHasBitmapIndexes(true); - public static DimensionHandler getHandlerFromCapabilities( + public static DimensionHandler getHandlerFromCapabilities( String dimensionName, - ColumnCapabilities capabilities, + @Nullable ColumnCapabilities capabilities, @Nullable MultiValueHandling multiValueHandling ) { @@ -112,7 +112,7 @@ public final class DimensionHandlerUtils * {@link #createColumnSelectorPluses(ColumnSelectorStrategyFactory, List, ColumnSelectorFactory)} with a singleton * list of dimensionSpecs and then retrieving the only element in the returned array. * - * @param The strategy type created by the provided strategy factory. + * @param The strategy type created by the provided strategy factory. * @param strategyFactory A factory provided by query engines that generates type-handling strategies * @param dimensionSpec column to generate a ColumnSelectorPlus object for * @param cursor Used to create value selectors for columns. @@ -121,8 +121,8 @@ public final class DimensionHandlerUtils * * @see ColumnProcessors#makeProcessor which may replace this in the future */ - public static ColumnSelectorPlus createColumnSelectorPlus( - ColumnSelectorStrategyFactory strategyFactory, + public static ColumnSelectorPlus createColumnSelectorPlus( + ColumnSelectorStrategyFactory strategyFactory, DimensionSpec dimensionSpec, ColumnSelectorFactory cursor ) @@ -141,7 +141,7 @@ public final class DimensionHandlerUtils * A caller should define a strategy factory that provides an interface for type-specific operations * in a query engine. See GroupByStrategyFactory for a reference. * - * @param The strategy type created by the provided strategy factory. + * @param The strategy type created by the provided strategy factory. * @param strategyFactory A factory provided by query engines that generates type-handling strategies * @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for * @param columnSelectorFactory Used to create value selectors for columns. @@ -150,32 +150,29 @@ public final class DimensionHandlerUtils * * @see ColumnProcessors#makeProcessor which may replace this in the future */ - public static - //CHECKSTYLE.OFF: Indentation - ColumnSelectorPlus[] createColumnSelectorPluses( - //CHECKSTYLE.ON: Indentation - ColumnSelectorStrategyFactory strategyFactory, + public static ColumnSelectorPlus[] createColumnSelectorPluses( + ColumnSelectorStrategyFactory strategyFactory, List dimensionSpecs, ColumnSelectorFactory columnSelectorFactory ) { int dimCount = dimensionSpecs.size(); @SuppressWarnings("unchecked") - ColumnSelectorPlus[] dims = new ColumnSelectorPlus[dimCount]; + ColumnSelectorPlus[] dims = new ColumnSelectorPlus[dimCount]; for (int i = 0; i < dimCount; i++) { final DimensionSpec dimSpec = dimensionSpecs.get(i); final String dimName = dimSpec.getDimension(); - final ColumnValueSelector selector = getColumnValueSelectorFromDimensionSpec( + final ColumnValueSelector selector = getColumnValueSelectorFromDimensionSpec( dimSpec, columnSelectorFactory ); - ColumnSelectorStrategyClass strategy = makeStrategy( + Strategy strategy = makeStrategy( strategyFactory, dimSpec, columnSelectorFactory.getColumnCapabilities(dimSpec.getDimension()), selector ); - final ColumnSelectorPlus selectorPlus = new ColumnSelectorPlus<>( + final ColumnSelectorPlus selectorPlus = new ColumnSelectorPlus<>( dimName, dimSpec.getOutputName(), strategy, @@ -186,7 +183,7 @@ public final class DimensionHandlerUtils return dims; } - private static ColumnValueSelector getColumnValueSelectorFromDimensionSpec( + private static ColumnValueSelector getColumnValueSelectorFromDimensionSpec( DimensionSpec dimSpec, ColumnSelectorFactory columnSelectorFactory ) @@ -194,12 +191,10 @@ public final class DimensionHandlerUtils String dimName = dimSpec.getDimension(); ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(dimName); capabilities = getEffectiveCapabilities(dimSpec, capabilities); - switch (capabilities.getType()) { - case STRING: - return columnSelectorFactory.makeDimensionSelector(dimSpec); - default: - return columnSelectorFactory.makeColumnValueSelector(dimSpec.getDimension()); + if (capabilities.getType() == ValueType.STRING) { + return columnSelectorFactory.makeDimensionSelector(dimSpec); } + return columnSelectorFactory.makeColumnValueSelector(dimSpec.getDimension()); } /** @@ -238,11 +233,11 @@ public final class DimensionHandlerUtils return capabilities; } - private static ColumnSelectorStrategyClass makeStrategy( - ColumnSelectorStrategyFactory strategyFactory, + private static Strategy makeStrategy( + ColumnSelectorStrategyFactory strategyFactory, DimensionSpec dimSpec, @Nullable ColumnCapabilities capabilities, - ColumnValueSelector selector + ColumnValueSelector selector ) { capabilities = getEffectiveCapabilities(dimSpec, capabilities); diff --git a/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java b/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java index b5181b1d251..7a8d7d4fa8f 100644 --- a/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java +++ b/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java @@ -188,6 +188,7 @@ public class VirtualColumns implements Cacheable } } + @Nullable public DimensionSelector makeDimensionSelector( DimensionSpec dimensionSpec, ColumnSelector columnSelector, @@ -202,6 +203,7 @@ public class VirtualColumns implements Cacheable } } + @Nullable public ColumnValueSelector makeColumnValueSelector( String columnName, ColumnSelector columnSelector, @@ -253,6 +255,7 @@ public class VirtualColumns implements Cacheable } } + @Nullable public ColumnCapabilities getColumnCapabilitiesWithFallback(StorageAdapter adapter, String columnName) { final ColumnCapabilities virtualColumnCapabilities = getColumnCapabilities(columnName); diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java index 0ba5860dc74..7340da59037 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java @@ -100,6 +100,7 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -5832,9 +5833,9 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest .put("index_alias", 147L) .put("longNumericNull", 10L) .build(), - makeNumericNullRowHelper("index_alias", 114L, "longNumericNull", NullHandling.defaultLongValue()), - makeNumericNullRowHelper("index_alias", 126L, "longNumericNull", NullHandling.defaultLongValue()), - makeNumericNullRowHelper("index_alias", 166L, "longNumericNull", NullHandling.defaultLongValue()) + makeRowWithNulls("index_alias", 114L, "longNumericNull", NullHandling.defaultLongValue()), + makeRowWithNulls("index_alias", 126L, "longNumericNull", NullHandling.defaultLongValue()), + makeRowWithNulls("index_alias", 166L, "longNumericNull", NullHandling.defaultLongValue()) ) ) ) @@ -5900,9 +5901,9 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest .put("index_alias", 147L) .put("floatNumericNull", 10f) .build(), - makeNumericNullRowHelper("index_alias", 114L, "floatNumericNull", NullHandling.defaultFloatValue()), - makeNumericNullRowHelper("index_alias", 126L, "floatNumericNull", NullHandling.defaultFloatValue()), - makeNumericNullRowHelper("index_alias", 166L, "floatNumericNull", NullHandling.defaultFloatValue()) + makeRowWithNulls("index_alias", 114L, "floatNumericNull", NullHandling.defaultFloatValue()), + makeRowWithNulls("index_alias", 126L, "floatNumericNull", NullHandling.defaultFloatValue()), + makeRowWithNulls("index_alias", 166L, "floatNumericNull", NullHandling.defaultFloatValue()) ) ) ) @@ -5968,9 +5969,9 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest .put("index_alias", 147L) .put("doubleNumericNull", 10d) .build(), - makeNumericNullRowHelper("index_alias", 114L, "doubleNumericNull", NullHandling.defaultDoubleValue()), - makeNumericNullRowHelper("index_alias", 126L, "doubleNumericNull", NullHandling.defaultDoubleValue()), - makeNumericNullRowHelper("index_alias", 166L, "doubleNumericNull", NullHandling.defaultDoubleValue()) + makeRowWithNulls("index_alias", 114L, "doubleNumericNull", NullHandling.defaultDoubleValue()), + makeRowWithNulls("index_alias", 126L, "doubleNumericNull", NullHandling.defaultDoubleValue()), + makeRowWithNulls("index_alias", 166L, "doubleNumericNull", NullHandling.defaultDoubleValue()) ) ) ) @@ -5978,16 +5979,113 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest assertExpectedResults(expectedResults, query); } - private static Map makeNumericNullRowHelper( + + @Test + public void testAggregateOnLongNumericNull() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .dimension(new DefaultDimensionSpec("longNumericNull", "dim", ValueType.LONG)) + .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)) + .threshold(10000) + .aggregators(new CountAggregatorFactory("count")) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .build(); + + List> expectedResults = Collections.singletonList( + new Result<>( + DateTimes.of("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.asList( + makeRowWithNulls("dim", NullHandling.defaultLongValue(), "count", 279L), + makeRowWithNulls("dim", 10L, "count", 93L), + makeRowWithNulls("dim", 20L, "count", 93L), + makeRowWithNulls("dim", 40L, "count", 93L), + makeRowWithNulls("dim", 50L, "count", 279L), + makeRowWithNulls("dim", 70L, "count", 279L), + makeRowWithNulls("dim", 80L, "count", 93L) + ) + ) + ) + ); + assertExpectedResults(expectedResults, query); + } + + @Test + public void testAggregateOnDoubleNumericNull() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .dimension(new DefaultDimensionSpec("doubleNumericNull", "dim", ValueType.DOUBLE)) + .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)) + .threshold(10000) + .aggregators(new CountAggregatorFactory("count")) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .build(); + + List> expectedResults = Collections.singletonList( + new Result<>( + DateTimes.of("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.asList( + makeRowWithNulls("dim", NullHandling.defaultDoubleValue(), "count", 279L), + makeRowWithNulls("dim", 10.0, "count", 93L), + makeRowWithNulls("dim", 20.0, "count", 93L), + makeRowWithNulls("dim", 40.0, "count", 93L), + makeRowWithNulls("dim", 50.0, "count", 279L), + makeRowWithNulls("dim", 70.0, "count", 279L), + makeRowWithNulls("dim", 80.0, "count", 93L) + ) + ) + ) + ); + assertExpectedResults(expectedResults, query); + } + + @Test + public void testAggregateOnFloatNumericNull() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .dimension(new DefaultDimensionSpec("floatNumericNull", "dim", ValueType.FLOAT)) + .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)) + .threshold(10000) + .aggregators(new CountAggregatorFactory("count")) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .build(); + + List> expectedResults = Collections.singletonList( + new Result<>( + DateTimes.of("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.asList( + makeRowWithNulls("dim", NullHandling.defaultFloatValue(), "count", 279L), + makeRowWithNulls("dim", 10.0f, "count", 93L), + makeRowWithNulls("dim", 20.0f, "count", 93L), + makeRowWithNulls("dim", 40.0f, "count", 93L), + makeRowWithNulls("dim", 50.0f, "count", 279L), + makeRowWithNulls("dim", 70.0f, "count", 279L), + makeRowWithNulls("dim", 80.0f, "count", 93L) + ) + ) + ) + ); + assertExpectedResults(expectedResults, query); + } + + private static Map makeRowWithNulls( String dimName, - Object dimValue, - String nameOfColumnWithNull, - Object defaultNullValue + @Nullable Object dimValue, + String metric, + @Nullable Object metricVal ) { Map nullRow = new HashMap<>(); nullRow.put(dimName, dimValue); - nullRow.put(nameOfColumnWithNull, defaultNullValue); + nullRow.put(metric, metricVal); return nullRow; } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 208db301e07..65f4628ad34 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -2683,6 +2683,126 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testNullDoubleTopN() throws Exception + { + List expected; + if (useDefault) { + expected = ImmutableList.of( + new Object[]{1.7, 1L}, + new Object[]{1.0, 1L}, + new Object[]{0.0, 4L} + ); + } else { + expected = ImmutableList.of( + new Object[]{null, 3L}, + new Object[]{1.7, 1L}, + new Object[]{1.0, 1L}, + new Object[]{0.0, 1L} + ); + } + testQuery( + "SELECT d1, COUNT(*) FROM druid.numfoo GROUP BY d1 ORDER BY d1 DESC LIMIT 10", + QUERY_CONTEXT_DEFAULT, + ImmutableList.of( + new TopNQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(querySegmentSpec(Filtration.eternity())) + .dimension(new DefaultDimensionSpec("d1", "_d0", ValueType.DOUBLE)) + .threshold(10) + .aggregators(aggregators(new CountAggregatorFactory("a0"))) + .metric( + new InvertedTopNMetricSpec( + new DimensionTopNMetricSpec(null, StringComparators.NUMERIC) + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + expected + ); + } + + @Test + public void testNullFloatTopN() throws Exception + { + List expected; + if (useDefault) { + expected = ImmutableList.of( + new Object[]{1.0f, 1L}, + new Object[]{0.1f, 1L}, + new Object[]{0.0f, 4L} + ); + } else { + expected = ImmutableList.of( + new Object[]{null, 3L}, + new Object[]{1.0f, 1L}, + new Object[]{0.1f, 1L}, + new Object[]{0.0f, 1L} + ); + } + testQuery( + "SELECT f1, COUNT(*) FROM druid.numfoo GROUP BY f1 ORDER BY f1 DESC LIMIT 10", + QUERY_CONTEXT_DEFAULT, + ImmutableList.of( + new TopNQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(querySegmentSpec(Filtration.eternity())) + .dimension(new DefaultDimensionSpec("f1", "_d0", ValueType.FLOAT)) + .threshold(10) + .aggregators(aggregators(new CountAggregatorFactory("a0"))) + .metric( + new InvertedTopNMetricSpec( + new DimensionTopNMetricSpec(null, StringComparators.NUMERIC) + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + expected + ); + } + + @Test + public void testNullLongTopN() throws Exception + { + List expected; + if (useDefault) { + expected = ImmutableList.of( + new Object[]{325323L, 1L}, + new Object[]{7L, 1L}, + new Object[]{0L, 4L} + ); + } else { + expected = ImmutableList.of( + new Object[]{null, 3L}, + new Object[]{325323L, 1L}, + new Object[]{7L, 1L}, + new Object[]{0L, 1L} + ); + } + testQuery( + "SELECT l1, COUNT(*) FROM druid.numfoo GROUP BY l1 ORDER BY l1 DESC LIMIT 10", + QUERY_CONTEXT_DEFAULT, + ImmutableList.of( + new TopNQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(querySegmentSpec(Filtration.eternity())) + .dimension(new DefaultDimensionSpec("l1", "_d0", ValueType.LONG)) + .threshold(10) + .aggregators(aggregators(new CountAggregatorFactory("a0"))) + .metric( + new InvertedTopNMetricSpec( + new DimensionTopNMetricSpec(null, StringComparators.NUMERIC) + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + expected + ); + } + @Test public void testEmptyStringEquality() throws Exception {