From c078ed40fd9e24e2008178156953a384a1c910e1 Mon Sep 17 00:00:00 2001 From: Himanshu Date: Tue, 8 Oct 2019 15:35:07 -0700 Subject: [PATCH] groupBy query: optional limit push down to segment scan (#8426) * groupBy query: optional limit push down to segment scan * make segment level limit push down configurable * fix teamcity errors * fix segment limit pushdown flag handling on query level config override * use equals for comparator check * fix sql and null handling * fix unused imports * handle null offset in NullableValueGroupByColumnSelectorStrategy for buffer comparator similar to RowBasedGrouperHelper.NullableRowBasedKeySerdeHelper --- docs/querying/groupbyquery.md | 2 + .../query/groupby/GroupByQueryConfig.java | 13 + .../epinephelinae/GroupByQueryEngineV2.java | 117 ++++- .../GrouperBufferComparatorUtils.java | 436 ++++++++++++++++++ .../epinephelinae/RowBasedGrouperHelper.java | 329 ++----------- ...ngStringGroupByColumnSelectorStrategy.java | 22 + .../DoubleGroupByColumnSelectorStrategy.java | 14 +- .../FloatGroupByColumnSelectorStrategy.java | 16 + .../column/GroupByColumnSelectorStrategy.java | 11 + .../LongGroupByColumnSelectorStrategy.java | 16 + ...bleValueGroupByColumnSelectorStrategy.java | 16 +- .../StringGroupByColumnSelectorStrategy.java | 31 ++ .../query/groupby/GroupByQueryConfigTest.java | 4 +- 13 files changed, 710 insertions(+), 317 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GrouperBufferComparatorUtils.java diff --git a/docs/querying/groupbyquery.md b/docs/querying/groupbyquery.md index c6230621f00..a7dc72558db 100644 --- a/docs/querying/groupbyquery.md +++ b/docs/querying/groupbyquery.md @@ -412,6 +412,7 @@ Supported runtime properties: |`druid.query.groupBy.forceHashAggregation`|Force to use hash-based aggregation.|false| |`druid.query.groupBy.intermediateCombineDegree`|Number of intermediate nodes combined together in the combining tree. Higher degrees will need less threads which might be helpful to improve the query performance by reducing the overhead of too many threads if the server has sufficiently powerful cpu cores.|8| |`druid.query.groupBy.numParallelCombineThreads`|Hint for the number of parallel combining threads. This should be larger than 1 to turn on the parallel combining feature. The actual number of threads used for parallel combining is min(`druid.query.groupBy.numParallelCombineThreads`, `druid.processing.numThreads`).|1 (disabled)| +|`druid.query.groupBy.applyLimitPushDownToSegment`|If Broker pushes limit down to queryable nodes (historicals, peons) then limit results during segment scan.|true (enabled)| Supported query contexts: @@ -424,6 +425,7 @@ Supported query contexts: |`numParallelCombineThreads`|Overrides the value of `druid.query.groupBy.numParallelCombineThreads`|None| |`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|false| |`forceLimitPushDown`|When all fields in the orderby are part of the grouping key, the Broker will push limit application down to the Historical processes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|false| +|`applyLimitPushDownToSegment`|If Broker pushes limit down to queryable nodes (historicals, peons) then limit results during segment scan. This context value can be used to override `druid.query.groupBy.applyLimitPushDownToSegment`.|true| ##### GroupBy v1 configurations diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java index 92d9e9e4d05..17ca92905b6 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java @@ -29,6 +29,7 @@ public class GroupByQueryConfig public static final String CTX_KEY_STRATEGY = "groupByStrategy"; public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown"; public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN = "applyLimitPushDown"; + public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN_TO_SEGMENT = "applyLimitPushDownToSegment"; public static final String CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY = "forcePushDownNestedQuery"; public static final String CTX_KEY_EXECUTING_NESTED_QUERY = "executingNestedQuery"; public static final String CTX_KEY_ARRAY_RESULT_ROWS = "resultAsArray"; @@ -78,6 +79,9 @@ public class GroupByQueryConfig @JsonProperty private boolean forcePushDownLimit = false; + @JsonProperty + private boolean applyLimitPushDownToSegment = true; + @JsonProperty private boolean forcePushDownNestedQuery = false; @@ -158,6 +162,11 @@ public class GroupByQueryConfig return forcePushDownLimit; } + public boolean isApplyLimitPushDownToSegment() + { + return applyLimitPushDownToSegment; + } + public boolean isForceHashAggregation() { return forceHashAggregation; @@ -220,6 +229,10 @@ public class GroupByQueryConfig getMaxMergingDictionarySize() ); newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit()); + newConfig.applyLimitPushDownToSegment = query.getContextBoolean( + CTX_KEY_APPLY_LIMIT_PUSH_DOWN_TO_SEGMENT, + isApplyLimitPushDownToSegment() + ); newConfig.forceHashAggregation = query.getContextBoolean(CTX_KEY_FORCE_HASH_AGGREGATION, isForceHashAggregation()); newConfig.forcePushDownNestedQuery = query.getContextBoolean(CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, isForcePushDownNestedQuery()); newConfig.intermediateCombineDegree = query.getContextValue( diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index c26e515d0fe..d823d8a6137 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.AggregatorAdapters; @@ -50,7 +51,9 @@ import org.apache.druid.query.groupby.epinephelinae.column.LongGroupByColumnSele import org.apache.druid.query.groupby.epinephelinae.column.NullableValueGroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.column.StringGroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine; +import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; @@ -340,7 +343,7 @@ public class GroupByQueryEngineV2 case STRING: DimensionSelector dimSelector = (DimensionSelector) selector; if (dimSelector.getValueCardinality() >= 0) { - return new StringGroupByColumnSelectorStrategy(); + return new StringGroupByColumnSelectorStrategy(dimSelector::lookupName); } else { return new DictionaryBuildingStringGroupByColumnSelectorStrategy(); } @@ -393,7 +396,7 @@ public class GroupByQueryEngineV2 this.querySpecificConfig = querySpecificConfig; this.cursor = cursor; this.buffer = buffer; - this.keySerde = new GroupByEngineKeySerde(dims); + this.keySerde = new GroupByEngineKeySerde(dims, query); this.dims = dims; // Time is the same for every row in the cursor @@ -516,6 +519,8 @@ public class GroupByQueryEngineV2 private static class HashAggregateIterator extends GroupByEngineIterator { + private static final Logger LOGGER = new Logger(HashAggregateIterator.class); + private final int[] stack; private final Object[] valuess; private final ByteBuffer keyBuffer; @@ -544,18 +549,54 @@ public class GroupByQueryEngineV2 @Override protected Grouper newGrouper() { - return new BufferHashGrouper<>( - Suppliers.ofInstance(buffer), - keySerde, - AggregatorAdapters.factorizeBuffered( - cursor.getColumnSelectorFactory(), - query.getAggregatorSpecs() - ), - querySpecificConfig.getBufferGrouperMaxSize(), - querySpecificConfig.getBufferGrouperMaxLoadFactor(), - querySpecificConfig.getBufferGrouperInitialBuckets(), - true - ); + Grouper grouper = null; + final DefaultLimitSpec limitSpec = query.isApplyLimitPushDown() && + querySpecificConfig.isApplyLimitPushDownToSegment() ? + (DefaultLimitSpec) query.getLimitSpec() : null; + if (limitSpec != null) { + LimitedBufferHashGrouper limitGrouper = new LimitedBufferHashGrouper<>( + Suppliers.ofInstance(buffer), + keySerde, + AggregatorAdapters.factorizeBuffered( + cursor.getColumnSelectorFactory(), + query.getAggregatorSpecs() + ), + querySpecificConfig.getBufferGrouperMaxSize(), + querySpecificConfig.getBufferGrouperMaxLoadFactor(), + querySpecificConfig.getBufferGrouperInitialBuckets(), + limitSpec.getLimit(), + DefaultLimitSpec.sortingOrderHasNonGroupingFields( + limitSpec, + query.getDimensions() + ) + ); + + if (limitGrouper.validateBufferCapacity(buffer.capacity())) { + grouper = limitGrouper; + } else { + LOGGER.warn( + "Limit is not applied in segment scan phase due to limited buffer capacity for query [%s].", + query.getId() + ); + } + } + + if (grouper == null) { + grouper = new BufferHashGrouper<>( + Suppliers.ofInstance(buffer), + keySerde, + AggregatorAdapters.factorizeBuffered( + cursor.getColumnSelectorFactory(), + query.getAggregatorSpecs() + ), + querySpecificConfig.getBufferGrouperMaxSize(), + querySpecificConfig.getBufferGrouperMaxLoadFactor(), + querySpecificConfig.getBufferGrouperInitialBuckets(), + true + ); + } + + return grouper; } @Override @@ -808,14 +849,19 @@ public class GroupByQueryEngineV2 private static class GroupByEngineKeySerde implements Grouper.KeySerde { private final int keySize; + private final GroupByColumnSelectorPlus[] dims; + private final GroupByQuery query; - public GroupByEngineKeySerde(final GroupByColumnSelectorPlus[] dims) + public GroupByEngineKeySerde(final GroupByColumnSelectorPlus[] dims, GroupByQuery query) { + this.dims = dims; int keySize = 0; for (GroupByColumnSelectorPlus selectorPlus : dims) { keySize += selectorPlus.getColumnSelectorStrategy().getGroupingKeySize(); } this.keySize = keySize; + + this.query = query; } @Override @@ -853,8 +899,15 @@ public class GroupByQueryEngineV2 @Override public Grouper.BufferComparator bufferComparator() { - // No sorting, let mergeRunners handle that - throw new UnsupportedOperationException(); + Preconditions.checkState(query.isApplyLimitPushDown(), "no limit push down"); + DefaultLimitSpec limitSpec = (DefaultLimitSpec) query.getLimitSpec(); + + return GrouperBufferComparatorUtils.bufferComparator( + query.getResultRowHasTimestamp(), + query.getContextSortByDimsFirst(), + query.getDimensions().size(), + getDimensionComparators(limitSpec) + ); } @Override @@ -863,8 +916,34 @@ public class GroupByQueryEngineV2 int[] aggregatorOffsets ) { - // not called on this - throw new UnsupportedOperationException(); + Preconditions.checkState(query.isApplyLimitPushDown(), "no limit push down"); + DefaultLimitSpec limitSpec = (DefaultLimitSpec) query.getLimitSpec(); + + return GrouperBufferComparatorUtils.bufferComparatorWithAggregators( + query.getAggregatorSpecs().toArray(new AggregatorFactory[0]), + aggregatorOffsets, + limitSpec, + query.getDimensions(), + getDimensionComparators(limitSpec), + query.getResultRowHasTimestamp(), + query.getContextSortByDimsFirst() + ); + } + + private Grouper.BufferComparator[] getDimensionComparators(DefaultLimitSpec limitSpec) + { + Grouper.BufferComparator[] dimComparators = new Grouper.BufferComparator[dims.length]; + + for (int i = 0; i < dims.length; i++) { + final String dimName = query.getDimensions().get(i).getOutputName(); + StringComparator stringComparator = DefaultLimitSpec.getComparatorForDimName(limitSpec, dimName); + dimComparators[i] = dims[i].getColumnSelectorStrategy().bufferComparator( + dims[i].getKeyBufferPosition(), + stringComparator + ); + } + + return dimComparators; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GrouperBufferComparatorUtils.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GrouperBufferComparatorUtils.java new file mode 100644 index 00000000000..d9c19552de4 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GrouperBufferComparatorUtils.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby.epinephelinae; + +import com.google.common.primitives.Longs; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.ordering.StringComparator; +import org.apache.druid.query.ordering.StringComparators; +import org.apache.druid.segment.column.ValueType; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Set of utility methods to faciliate implementation of {@link Grouper.KeySerde#bufferComparator()} and + * {@link Grouper.KeySerde#bufferComparatorWithAggregators(AggregatorFactory[], int[])} + */ +public class GrouperBufferComparatorUtils +{ + public static Grouper.BufferComparator bufferComparator( + boolean includeTimestamp, + boolean sortByDimsFirst, + int dimCount, + Grouper.BufferComparator[] serdeHelperComparators + ) + { + if (includeTimestamp) { + if (sortByDimsFirst) { + return new Grouper.BufferComparator() + { + @Override + public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) + { + final int cmp = compareDimsInBuffersForNullFudgeTimestamp( + serdeHelperComparators, + lhsBuffer, + rhsBuffer, + lhsPosition, + rhsPosition + ); + if (cmp != 0) { + return cmp; + } + + return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); + } + }; + } else { + return new Grouper.BufferComparator() + { + @Override + public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) + { + final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); + + if (timeCompare != 0) { + return timeCompare; + } + + return compareDimsInBuffersForNullFudgeTimestamp( + serdeHelperComparators, + lhsBuffer, + rhsBuffer, + lhsPosition, + rhsPosition + ); + } + }; + } + } else { + return new Grouper.BufferComparator() + { + @Override + public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) + { + for (int i = 0; i < dimCount; i++) { + final int cmp = serdeHelperComparators[i].compare( + lhsBuffer, + rhsBuffer, + lhsPosition, + rhsPosition + ); + + if (cmp != 0) { + return cmp; + } + } + + return 0; + } + }; + } + } + + public static Grouper.BufferComparator bufferComparatorWithAggregators( + AggregatorFactory[] aggregatorFactories, + int[] aggregatorOffsets, + DefaultLimitSpec limitSpec, + List dimensions, + Grouper.BufferComparator[] dimComparators, + boolean includeTimestamp, + boolean sortByDimsFirst + ) + { + int dimCount = dimensions.size(); + final List needsReverses = new ArrayList<>(); + List comparators = new ArrayList<>(); + Set orderByIndices = new HashSet<>(); + + int aggCount = 0; + boolean needsReverse; + for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) { + needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING; + int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions); + if (dimIndex >= 0) { + comparators.add(dimComparators[dimIndex]); + orderByIndices.add(dimIndex); + needsReverses.add(needsReverse); + } else { + int aggIndex = OrderByColumnSpec.getAggIndexForOrderBy(orderSpec, Arrays.asList(aggregatorFactories)); + if (aggIndex >= 0) { + final StringComparator stringComparator = orderSpec.getDimensionComparator(); + final String typeName = aggregatorFactories[aggIndex].getTypeName(); + final int aggOffset = aggregatorOffsets[aggIndex] - Integer.BYTES; + + aggCount++; + + final ValueType valueType = ValueType.fromString(typeName); + if (!ValueType.isNumeric(valueType)) { + throw new IAE("Cannot order by a non-numeric aggregator[%s]", orderSpec); + } + + comparators.add(makeNullHandlingBufferComparatorForNumericData( + aggOffset, + makeNumericBufferComparator(valueType, aggOffset, true, stringComparator) + )); + needsReverses.add(needsReverse); + } + } + } + + for (int i = 0; i < dimCount; i++) { + if (!orderByIndices.contains(i)) { + comparators.add(dimComparators[i]); + needsReverses.add(false); // default to Ascending order if dim is not in an orderby spec + } + } + + + final Grouper.BufferComparator[] adjustedSerdeHelperComparators = comparators.toArray(new Grouper.BufferComparator[0]); + + final int fieldCount = dimCount + aggCount; + + if (includeTimestamp) { + if (sortByDimsFirst) { + return new Grouper.BufferComparator() + { + @Override + public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) + { + final int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown( + adjustedSerdeHelperComparators, + needsReverses, + fieldCount, + lhsBuffer, + rhsBuffer, + lhsPosition, + rhsPosition + ); + if (cmp != 0) { + return cmp; + } + + return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); + } + }; + } else { + return new Grouper.BufferComparator() + { + @Override + public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) + { + final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); + + if (timeCompare != 0) { + return timeCompare; + } + + int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown( + adjustedSerdeHelperComparators, + needsReverses, + fieldCount, + lhsBuffer, + rhsBuffer, + lhsPosition, + rhsPosition + ); + + return cmp; + } + }; + } + } else { + return new Grouper.BufferComparator() + { + @Override + public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) + { + for (int i = 0; i < fieldCount; i++) { + final int cmp; + if (needsReverses.get(i)) { + cmp = adjustedSerdeHelperComparators[i].compare( + rhsBuffer, + lhsBuffer, + rhsPosition, + lhsPosition + ); + } else { + cmp = adjustedSerdeHelperComparators[i].compare( + lhsBuffer, + rhsBuffer, + lhsPosition, + rhsPosition + ); + } + + if (cmp != 0) { + return cmp; + } + } + + return 0; + } + }; + } + } + + private static int compareDimsInBuffersForNullFudgeTimestampForPushDown( + Grouper.BufferComparator[] serdeHelperComparators, + List needsReverses, + int dimCount, + ByteBuffer lhsBuffer, + ByteBuffer rhsBuffer, + int lhsPosition, + int rhsPosition + ) + { + for (int i = 0; i < dimCount; i++) { + final int cmp; + if (needsReverses.get(i)) { + cmp = serdeHelperComparators[i].compare( + rhsBuffer, + lhsBuffer, + rhsPosition + Long.BYTES, + lhsPosition + Long.BYTES + ); + } else { + cmp = serdeHelperComparators[i].compare( + lhsBuffer, + rhsBuffer, + lhsPosition + Long.BYTES, + rhsPosition + Long.BYTES + ); + } + if (cmp != 0) { + return cmp; + } + } + + return 0; + } + + private static int compareDimsInBuffersForNullFudgeTimestamp( + Grouper.BufferComparator[] serdeHelperComparators, + ByteBuffer lhsBuffer, + ByteBuffer rhsBuffer, + int lhsPosition, + int rhsPosition + ) + { + for (Grouper.BufferComparator comparator : serdeHelperComparators) { + final int cmp = comparator.compare( + lhsBuffer, + rhsBuffer, + lhsPosition + Long.BYTES, + rhsPosition + Long.BYTES + ); + if (cmp != 0) { + return cmp; + } + } + + return 0; + } + + private static Grouper.BufferComparator makeNumericBufferComparator( + ValueType valueType, + int keyBufferPosition, + boolean pushLimitDown, + @Nullable StringComparator stringComparator + ) + { + switch (valueType) { + case LONG: + return makeBufferComparatorForLong(keyBufferPosition, pushLimitDown, stringComparator); + case FLOAT: + return makeBufferComparatorForFloat(keyBufferPosition, pushLimitDown, stringComparator); + case DOUBLE: + return makeBufferComparatorForDouble(keyBufferPosition, pushLimitDown, stringComparator); + default: + throw new IAE("invalid type: %s", valueType); + } + } + + public static Grouper.BufferComparator makeBufferComparatorForLong( + int keyBufferPosition, + boolean pushLimitDown, + @Nullable StringComparator stringComparator + ) + { + if (isPrimitiveComparable(pushLimitDown, stringComparator)) { + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Longs.compare( + lhsBuffer.getLong(lhsPosition + keyBufferPosition), + rhsBuffer.getLong(rhsPosition + keyBufferPosition) + ); + } else { + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { + long lhs = lhsBuffer.getLong(lhsPosition + keyBufferPosition); + long rhs = rhsBuffer.getLong(rhsPosition + keyBufferPosition); + + return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs)); + }; + } + } + + public static Grouper.BufferComparator makeBufferComparatorForDouble( + int keyBufferPosition, + boolean pushLimitDown, + @Nullable StringComparator stringComparator + ) + { + if (isPrimitiveComparable(pushLimitDown, stringComparator)) { + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Double.compare( + lhsBuffer.getDouble(lhsPosition + keyBufferPosition), + rhsBuffer.getDouble(rhsPosition + keyBufferPosition) + ); + } else { + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { + double lhs = lhsBuffer.getDouble(lhsPosition + keyBufferPosition); + double rhs = rhsBuffer.getDouble(rhsPosition + keyBufferPosition); + return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs)); + }; + } + } + + public static Grouper.BufferComparator makeBufferComparatorForFloat( + int keyBufferPosition, + boolean pushLimitDown, + @Nullable StringComparator stringComparator + ) + { + if (isPrimitiveComparable(pushLimitDown, stringComparator)) { + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Float.compare( + lhsBuffer.getFloat(lhsPosition + keyBufferPosition), + rhsBuffer.getFloat(rhsPosition + keyBufferPosition) + ); + } else { + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { + float lhs = lhsBuffer.getFloat(lhsPosition + keyBufferPosition); + float rhs = rhsBuffer.getFloat(rhsPosition + keyBufferPosition); + return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs)); + }; + } + } + + public static Grouper.BufferComparator makeNullHandlingBufferComparatorForNumericData( + int keyBufferPosition, + Grouper.BufferComparator delegate + ) + { + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { + boolean isLhsNull = (lhsBuffer.get(lhsPosition + keyBufferPosition) == NullHandling.IS_NULL_BYTE); + boolean isRhsNull = (rhsBuffer.get(rhsPosition + keyBufferPosition) == NullHandling.IS_NULL_BYTE); + if (isLhsNull && isRhsNull) { + // Both are null + return 0; + } + // only lhs is null + if (isLhsNull) { + return -1; + } + // only rhs is null + if (isRhsNull) { + return 1; + } + return delegate.compare( + lhsBuffer, + rhsBuffer, + lhsPosition, + rhsPosition + ); + }; + } + + private static boolean isPrimitiveComparable(boolean pushLimitDown, @Nullable StringComparator stringComparator) + { + return !pushLimitDown || stringComparator == null || stringComparator.equals(StringComparators.NUMERIC); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 9315b779aa5..d2663d4316a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1171,57 +1171,12 @@ public class RowBasedGrouperHelper initializeRankOfDictionaryIds(); } - if (includeTimestamp) { - if (sortByDimsFirst) { - return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { - final int cmp = compareDimsInBuffersForNullFudgeTimestamp( - serdeHelperComparators, - lhsBuffer, - rhsBuffer, - lhsPosition, - rhsPosition - ); - if (cmp != 0) { - return cmp; - } - - return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); - }; - } else { - return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { - final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); - - if (timeCompare != 0) { - return timeCompare; - } - - return compareDimsInBuffersForNullFudgeTimestamp( - serdeHelperComparators, - lhsBuffer, - rhsBuffer, - lhsPosition, - rhsPosition - ); - }; - } - } else { - return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { - for (int i = 0; i < dimCount; i++) { - final int cmp = serdeHelperComparators[i].compare( - lhsBuffer, - rhsBuffer, - lhsPosition, - rhsPosition - ); - - if (cmp != 0) { - return cmp; - } - } - - return 0; - }; - } + return GrouperBufferComparatorUtils.bufferComparator( + includeTimestamp, + sortByDimsFirst, + dimCount, + serdeHelperComparators + ); } @Override @@ -1230,127 +1185,15 @@ public class RowBasedGrouperHelper int[] aggregatorOffsets ) { - final List adjustedSerdeHelpers; - final List needsReverses = new ArrayList<>(); - List orderByHelpers = new ArrayList<>(); - List otherDimHelpers = new ArrayList<>(); - Set orderByIndices = new HashSet<>(); - - int aggCount = 0; - boolean needsReverse; - for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) { - needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING; - int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions); - if (dimIndex >= 0) { - RowBasedKeySerdeHelper serdeHelper = serdeHelpers[dimIndex]; - orderByHelpers.add(serdeHelper); - orderByIndices.add(dimIndex); - needsReverses.add(needsReverse); - } else { - int aggIndex = OrderByColumnSpec.getAggIndexForOrderBy(orderSpec, Arrays.asList(aggregatorFactories)); - if (aggIndex >= 0) { - final RowBasedKeySerdeHelper serdeHelper; - final StringComparator stringComparator = orderSpec.getDimensionComparator(); - final String typeName = aggregatorFactories[aggIndex].getTypeName(); - final int aggOffset = aggregatorOffsets[aggIndex] - Integer.BYTES; - - aggCount++; - - final ValueType valueType = ValueType.fromString(typeName); - if (!ValueType.isNumeric(valueType)) { - throw new IAE("Cannot order by a non-numeric aggregator[%s]", orderSpec); - } - - serdeHelper = makeNullHandlingNumericserdeHelper(valueType, aggOffset, true, stringComparator); - - orderByHelpers.add(serdeHelper); - needsReverses.add(needsReverse); - } - } - } - - for (int i = 0; i < dimCount; i++) { - if (!orderByIndices.contains(i)) { - otherDimHelpers.add(serdeHelpers[i]); - needsReverses.add(false); // default to Ascending order if dim is not in an orderby spec - } - } - - adjustedSerdeHelpers = orderByHelpers; - adjustedSerdeHelpers.addAll(otherDimHelpers); - - final BufferComparator[] adjustedSerdeHelperComparators = new BufferComparator[adjustedSerdeHelpers.size()]; - Arrays.setAll(adjustedSerdeHelperComparators, i -> adjustedSerdeHelpers.get(i).getBufferComparator()); - - final int fieldCount = dimCount + aggCount; - - if (includeTimestamp) { - if (sortByDimsFirst) { - return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { - final int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown( - adjustedSerdeHelperComparators, - needsReverses, - fieldCount, - lhsBuffer, - rhsBuffer, - lhsPosition, - rhsPosition - ); - if (cmp != 0) { - return cmp; - } - - return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); - }; - } else { - return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { - final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); - - if (timeCompare != 0) { - return timeCompare; - } - - int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown( - adjustedSerdeHelperComparators, - needsReverses, - fieldCount, - lhsBuffer, - rhsBuffer, - lhsPosition, - rhsPosition - ); - - return cmp; - }; - } - } else { - return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { - for (int i = 0; i < fieldCount; i++) { - final int cmp; - if (needsReverses.get(i)) { - cmp = adjustedSerdeHelperComparators[i].compare( - rhsBuffer, - lhsBuffer, - rhsPosition, - lhsPosition - ); - } else { - cmp = adjustedSerdeHelperComparators[i].compare( - lhsBuffer, - rhsBuffer, - lhsPosition, - rhsPosition - ); - } - - if (cmp != 0) { - return cmp; - } - } - - return 0; - }; - } + return GrouperBufferComparatorUtils.bufferComparatorWithAggregators( + aggregatorFactories, + aggregatorOffsets, + limitSpec, + dimensions, + serdeHelperComparators, + includeTimestamp, + sortByDimsFirst + ); } @Override @@ -1478,11 +1321,6 @@ public class RowBasedGrouperHelper } } - private static boolean isPrimitiveComparable(boolean pushLimitDown, @Nullable StringComparator stringComparator) - { - return !pushLimitDown || stringComparator == null || stringComparator.equals(StringComparators.NUMERIC); - } - private abstract class AbstractStringRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper { final int keyBufferPosition; @@ -1617,19 +1455,11 @@ public class RowBasedGrouperHelper ) { this.keyBufferPosition = keyBufferPosition; - if (isPrimitiveComparable(pushLimitDown, stringComparator)) { - bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Longs.compare( - lhsBuffer.getLong(lhsPosition + keyBufferPosition), - rhsBuffer.getLong(rhsPosition + keyBufferPosition) - ); - } else { - bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { - long lhs = lhsBuffer.getLong(lhsPosition + keyBufferPosition); - long rhs = rhsBuffer.getLong(rhsPosition + keyBufferPosition); - - return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs)); - }; - } + bufferComparator = GrouperBufferComparatorUtils.makeBufferComparatorForLong( + keyBufferPosition, + pushLimitDown, + stringComparator + ); } @Override @@ -1670,18 +1500,11 @@ public class RowBasedGrouperHelper ) { this.keyBufferPosition = keyBufferPosition; - if (isPrimitiveComparable(pushLimitDown, stringComparator)) { - bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Float.compare( - lhsBuffer.getFloat(lhsPosition + keyBufferPosition), - rhsBuffer.getFloat(rhsPosition + keyBufferPosition) - ); - } else { - bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { - float lhs = lhsBuffer.getFloat(lhsPosition + keyBufferPosition); - float rhs = rhsBuffer.getFloat(rhsPosition + keyBufferPosition); - return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs)); - }; - } + bufferComparator = GrouperBufferComparatorUtils.makeBufferComparatorForFloat( + keyBufferPosition, + pushLimitDown, + stringComparator + ); } @Override @@ -1722,18 +1545,11 @@ public class RowBasedGrouperHelper ) { this.keyBufferPosition = keyBufferPosition; - if (isPrimitiveComparable(pushLimitDown, stringComparator)) { - bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Double.compare( - lhsBuffer.getDouble(lhsPosition + keyBufferPosition), - rhsBuffer.getDouble(rhsPosition + keyBufferPosition) - ); - } else { - bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { - double lhs = lhsBuffer.getDouble(lhsPosition + keyBufferPosition); - double rhs = rhsBuffer.getDouble(rhsPosition + keyBufferPosition); - return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs)); - }; - } + bufferComparator = GrouperBufferComparatorUtils.makeBufferComparatorForDouble( + keyBufferPosition, + pushLimitDown, + stringComparator + ); } @Override @@ -1776,29 +1592,10 @@ public class RowBasedGrouperHelper { this.delegate = delegate; this.keyBufferPosition = keyBufferPosition; - BufferComparator delegateBufferComparator = this.delegate.getBufferComparator(); - this.comparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { - boolean isLhsNull = (lhsBuffer.get(lhsPosition + keyBufferPosition) == NullHandling.IS_NULL_BYTE); - boolean isRhsNull = (rhsBuffer.get(rhsPosition + keyBufferPosition) == NullHandling.IS_NULL_BYTE); - if (isLhsNull && isRhsNull) { - // Both are null - return 0; - } - // only lhs is null - if (isLhsNull) { - return -1; - } - // only rhs is null - if (isRhsNull) { - return 1; - } - return delegateBufferComparator.compare( - lhsBuffer, - rhsBuffer, - lhsPosition, - rhsPosition - ); - }; + this.comparator = GrouperBufferComparatorUtils.makeNullHandlingBufferComparatorForNumericData( + keyBufferPosition, + this.delegate.getBufferComparator() + ); } @Override @@ -1837,62 +1634,4 @@ public class RowBasedGrouperHelper } } } - - private static int compareDimsInBuffersForNullFudgeTimestamp( - BufferComparator[] serdeHelperComparators, - ByteBuffer lhsBuffer, - ByteBuffer rhsBuffer, - int lhsPosition, - int rhsPosition - ) - { - for (BufferComparator comparator : serdeHelperComparators) { - final int cmp = comparator.compare( - lhsBuffer, - rhsBuffer, - lhsPosition + Long.BYTES, - rhsPosition + Long.BYTES - ); - if (cmp != 0) { - return cmp; - } - } - - return 0; - } - - private static int compareDimsInBuffersForNullFudgeTimestampForPushDown( - BufferComparator[] serdeHelperComparators, - List needsReverses, - int dimCount, - ByteBuffer lhsBuffer, - ByteBuffer rhsBuffer, - int lhsPosition, - int rhsPosition - ) - { - for (int i = 0; i < dimCount; i++) { - final int cmp; - if (needsReverses.get(i)) { - cmp = serdeHelperComparators[i].compare( - rhsBuffer, - lhsBuffer, - rhsPosition + Long.BYTES, - lhsPosition + Long.BYTES - ); - } else { - cmp = serdeHelperComparators[i].compare( - lhsBuffer, - rhsBuffer, - lhsPosition + Long.BYTES, - rhsPosition + Long.BYTES - ); - } - if (cmp != 0) { - return cmp; - } - } - - return 0; - } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java index e1b664196c3..b6121af86a1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java @@ -23,11 +23,15 @@ import com.google.common.base.Preconditions; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.epinephelinae.Grouper; +import org.apache.druid.query.ordering.StringComparator; +import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.data.ArrayBasedIndexedInts; import org.apache.druid.segment.data.IndexedInts; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -48,6 +52,11 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin reverseDictionary.defaultReturnValue(-1); } + public DictionaryBuildingStringGroupByColumnSelectorStrategy() + { + super(null); + } + @Override public void processValueFromGroupingKey( GroupByColumnSelectorPlus selectorPlus, @@ -116,4 +125,17 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin return dictId; } } + + @Override + public Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator) + { + final StringComparator realComparator = stringComparator == null ? + StringComparators.LEXICOGRAPHIC : + stringComparator; + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { + String lhsStr = dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition)); + String rhsStr = dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition)); + return realComparator.compare(lhsStr, rhsStr); + }; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java index 6b3e577e218..86522b5b47b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java @@ -19,8 +19,10 @@ package org.apache.druid.query.groupby.epinephelinae.column; - import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.epinephelinae.Grouper; +import org.apache.druid.query.groupby.epinephelinae.GrouperBufferComparatorUtils; +import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionHandlerUtils; @@ -90,4 +92,14 @@ public class DoubleGroupByColumnSelectorStrategy implements GroupByColumnSelecto // this method handles row values after the first in a multivalued row, so just return false return false; } + + @Override + public Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator) + { + return GrouperBufferComparatorUtils.makeBufferComparatorForDouble( + keyBufferPosition, + true, + stringComparator + ); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java index 453c282b841..49fc1fb11bd 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java @@ -20,6 +20,9 @@ package org.apache.druid.query.groupby.epinephelinae.column; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.epinephelinae.Grouper; +import org.apache.druid.query.groupby.epinephelinae.GrouperBufferComparatorUtils; +import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionHandlerUtils; @@ -65,6 +68,19 @@ public class FloatGroupByColumnSelectorStrategy implements GroupByColumnSelector keyBuffer.putFloat(keyBufferPosition, DimensionHandlerUtils.nullToZero((Float) obj)); } + @Override + public Grouper.BufferComparator bufferComparator( + int keyBufferPosition, + @Nullable StringComparator stringComparator + ) + { + return GrouperBufferComparatorUtils.makeBufferComparatorForFloat( + keyBufferPosition, + true, + stringComparator + ); + } + @Override public void initGroupingKeyColumnValue( int keyBufferPosition, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java index 96655416268..4e91360297b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java @@ -21,8 +21,11 @@ package org.apache.druid.query.groupby.epinephelinae.column; import org.apache.druid.query.dimension.ColumnSelectorStrategy; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.epinephelinae.Grouper; +import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.segment.ColumnValueSelector; +import javax.annotation.Nullable; import java.nio.ByteBuffer; /** @@ -136,4 +139,12 @@ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy * @param keyBuffer grouping key */ void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer); + + /** + * Return BufferComparator for values written using this strategy when limit is pushed down to segment scan. + * @param keyBufferPosition starting offset for this column's value within the grouping key + * @param stringComparator stringComparator from LimitSpec for this column + * @return BufferComparator for comparing values written + */ + Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java index 3c5f8fefe9b..b70c5c22896 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java @@ -20,6 +20,9 @@ package org.apache.druid.query.groupby.epinephelinae.column; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.epinephelinae.Grouper; +import org.apache.druid.query.groupby.epinephelinae.GrouperBufferComparatorUtils; +import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionHandlerUtils; @@ -65,6 +68,19 @@ public class LongGroupByColumnSelectorStrategy implements GroupByColumnSelectorS keyBuffer.putLong(keyBufferPosition, DimensionHandlerUtils.nullToZero((Long) obj)); } + @Override + public Grouper.BufferComparator bufferComparator( + int keyBufferPosition, + @Nullable StringComparator stringComparator + ) + { + return GrouperBufferComparatorUtils.makeBufferComparatorForLong( + keyBufferPosition, + true, + stringComparator + ); + } + @Override public void initGroupingKeyColumnValue( int keyBufferPosition, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableValueGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableValueGroupByColumnSelectorStrategy.java index f0cfaf1ff28..3cbd6b8dc60 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableValueGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableValueGroupByColumnSelectorStrategy.java @@ -19,9 +19,11 @@ package org.apache.druid.query.groupby.epinephelinae.column; - import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.epinephelinae.Grouper; +import org.apache.druid.query.groupby.epinephelinae.GrouperBufferComparatorUtils; +import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.segment.ColumnValueSelector; import javax.annotation.Nullable; @@ -88,6 +90,18 @@ public class NullableValueGroupByColumnSelectorStrategy implements GroupByColumn delegate.writeToKeyBuffer(keyBufferPosition + Byte.BYTES, obj, keyBuffer); } + @Override + public Grouper.BufferComparator bufferComparator( + int keyBufferPosition, + @Nullable StringComparator stringComparator + ) + { + return GrouperBufferComparatorUtils.makeNullHandlingBufferComparatorForNumericData( + keyBufferPosition, + delegate.bufferComparator(keyBufferPosition + Byte.BYTES, stringComparator) + ); + } + @Override public void initGroupingKeyColumnValue( int keyBufferPosition, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java index 65c48419a19..2959323c65d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java @@ -22,14 +22,27 @@ package org.apache.druid.query.groupby.epinephelinae.column; import com.google.common.base.Preconditions; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.epinephelinae.Grouper; +import org.apache.druid.query.ordering.StringComparator; +import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.data.IndexedInts; +import javax.annotation.Nullable; import java.nio.ByteBuffer; +import java.util.function.IntFunction; public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy { + @Nullable + private final IntFunction dictionaryLookup; + + public StringGroupByColumnSelectorStrategy(IntFunction dictionaryLookup) + { + this.dictionaryLookup = dictionaryLookup; + } + @Override public int getGroupingKeySize() { @@ -131,4 +144,22 @@ public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelecto keyBuffer.putInt(keyBufferPosition, values.get(0)); } } + + @Override + public Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator) + { + if (stringComparator == null || StringComparators.LEXICOGRAPHIC.equals(stringComparator)) { + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Integer.compare( + lhsBuffer.getInt(lhsPosition + keyBufferPosition), + rhsBuffer.getInt(rhsPosition + keyBufferPosition) + ); + } else { + Preconditions.checkState(dictionaryLookup != null, "null dictionary lookup"); + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { + String lhsStr = dictionaryLookup.apply(lhsBuffer.getInt(lhsPosition + keyBufferPosition)); + String rhsStr = dictionaryLookup.apply(rhsBuffer.getInt(rhsPosition + keyBufferPosition)); + return stringComparator.compare(lhsStr, rhsStr); + }; + } + } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java index 74d5e28c9aa..4cb5e804005 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java @@ -94,7 +94,8 @@ public class GroupByQueryConfigTest "groupByStrategy", "v1", "maxOnDiskStorage", 0, "maxResults", 2, - "maxMergingDictionarySize", 3 + "maxMergingDictionarySize", 3, + "applyLimitPushDownToSegment", false ) ) .build() @@ -108,5 +109,6 @@ public class GroupByQueryConfigTest Assert.assertEquals(0, config2.getMaxOnDiskStorage()); Assert.assertEquals(3, config2.getMaxMergingDictionarySize()); Assert.assertEquals(6.0, config2.getBufferGrouperMaxLoadFactor(), 0.0); + Assert.assertFalse(config2.isApplyLimitPushDownToSegment()); } }