groupBy query: optional limit push down to segment scan (#8426)

* groupBy query: optional limit push down to segment scan

* make segment level limit push down configurable

* fix teamcity errors

* fix segment limit pushdown flag handling on query level config override

* use equals for comparator check

* fix sql and null handling

* fix unused imports

* handle null offset in NullableValueGroupByColumnSelectorStrategy for buffer comparator similar to RowBasedGrouperHelper.NullableRowBasedKeySerdeHelper
This commit is contained in:
Himanshu 2019-10-08 15:35:07 -07:00 committed by GitHub
parent d801ce2f29
commit c078ed40fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 710 additions and 317 deletions

View File

@ -412,6 +412,7 @@ Supported runtime properties:
|`druid.query.groupBy.forceHashAggregation`|Force to use hash-based aggregation.|false|
|`druid.query.groupBy.intermediateCombineDegree`|Number of intermediate nodes combined together in the combining tree. Higher degrees will need less threads which might be helpful to improve the query performance by reducing the overhead of too many threads if the server has sufficiently powerful cpu cores.|8|
|`druid.query.groupBy.numParallelCombineThreads`|Hint for the number of parallel combining threads. This should be larger than 1 to turn on the parallel combining feature. The actual number of threads used for parallel combining is min(`druid.query.groupBy.numParallelCombineThreads`, `druid.processing.numThreads`).|1 (disabled)|
|`druid.query.groupBy.applyLimitPushDownToSegment`|If Broker pushes limit down to queryable nodes (historicals, peons) then limit results during segment scan.|true (enabled)|
Supported query contexts:
@ -424,6 +425,7 @@ Supported query contexts:
|`numParallelCombineThreads`|Overrides the value of `druid.query.groupBy.numParallelCombineThreads`|None|
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|false|
|`forceLimitPushDown`|When all fields in the orderby are part of the grouping key, the Broker will push limit application down to the Historical processes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|false|
|`applyLimitPushDownToSegment`|If Broker pushes limit down to queryable nodes (historicals, peons) then limit results during segment scan. This context value can be used to override `druid.query.groupBy.applyLimitPushDownToSegment`.|true|
##### GroupBy v1 configurations

View File

@ -29,6 +29,7 @@ public class GroupByQueryConfig
public static final String CTX_KEY_STRATEGY = "groupByStrategy";
public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown";
public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN = "applyLimitPushDown";
public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN_TO_SEGMENT = "applyLimitPushDownToSegment";
public static final String CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY = "forcePushDownNestedQuery";
public static final String CTX_KEY_EXECUTING_NESTED_QUERY = "executingNestedQuery";
public static final String CTX_KEY_ARRAY_RESULT_ROWS = "resultAsArray";
@ -78,6 +79,9 @@ public class GroupByQueryConfig
@JsonProperty
private boolean forcePushDownLimit = false;
@JsonProperty
private boolean applyLimitPushDownToSegment = true;
@JsonProperty
private boolean forcePushDownNestedQuery = false;
@ -158,6 +162,11 @@ public class GroupByQueryConfig
return forcePushDownLimit;
}
public boolean isApplyLimitPushDownToSegment()
{
return applyLimitPushDownToSegment;
}
public boolean isForceHashAggregation()
{
return forceHashAggregation;
@ -220,6 +229,10 @@ public class GroupByQueryConfig
getMaxMergingDictionarySize()
);
newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit());
newConfig.applyLimitPushDownToSegment = query.getContextBoolean(
CTX_KEY_APPLY_LIMIT_PUSH_DOWN_TO_SEGMENT,
isApplyLimitPushDownToSegment()
);
newConfig.forceHashAggregation = query.getContextBoolean(CTX_KEY_FORCE_HASH_AGGREGATION, isForceHashAggregation());
newConfig.forcePushDownNestedQuery = query.getContextBoolean(CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, isForcePushDownNestedQuery());
newConfig.intermediateCombineDegree = query.getContextValue(

View File

@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorAdapters;
@ -50,7 +51,9 @@ import org.apache.druid.query.groupby.epinephelinae.column.LongGroupByColumnSele
import org.apache.druid.query.groupby.epinephelinae.column.NullableValueGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.StringGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
@ -340,7 +343,7 @@ public class GroupByQueryEngineV2
case STRING:
DimensionSelector dimSelector = (DimensionSelector) selector;
if (dimSelector.getValueCardinality() >= 0) {
return new StringGroupByColumnSelectorStrategy();
return new StringGroupByColumnSelectorStrategy(dimSelector::lookupName);
} else {
return new DictionaryBuildingStringGroupByColumnSelectorStrategy();
}
@ -393,7 +396,7 @@ public class GroupByQueryEngineV2
this.querySpecificConfig = querySpecificConfig;
this.cursor = cursor;
this.buffer = buffer;
this.keySerde = new GroupByEngineKeySerde(dims);
this.keySerde = new GroupByEngineKeySerde(dims, query);
this.dims = dims;
// Time is the same for every row in the cursor
@ -516,6 +519,8 @@ public class GroupByQueryEngineV2
private static class HashAggregateIterator extends GroupByEngineIterator<ByteBuffer>
{
private static final Logger LOGGER = new Logger(HashAggregateIterator.class);
private final int[] stack;
private final Object[] valuess;
private final ByteBuffer keyBuffer;
@ -544,18 +549,54 @@ public class GroupByQueryEngineV2
@Override
protected Grouper<ByteBuffer> newGrouper()
{
return new BufferHashGrouper<>(
Suppliers.ofInstance(buffer),
keySerde,
AggregatorAdapters.factorizeBuffered(
cursor.getColumnSelectorFactory(),
query.getAggregatorSpecs()
),
querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets(),
true
);
Grouper grouper = null;
final DefaultLimitSpec limitSpec = query.isApplyLimitPushDown() &&
querySpecificConfig.isApplyLimitPushDownToSegment() ?
(DefaultLimitSpec) query.getLimitSpec() : null;
if (limitSpec != null) {
LimitedBufferHashGrouper limitGrouper = new LimitedBufferHashGrouper<>(
Suppliers.ofInstance(buffer),
keySerde,
AggregatorAdapters.factorizeBuffered(
cursor.getColumnSelectorFactory(),
query.getAggregatorSpecs()
),
querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets(),
limitSpec.getLimit(),
DefaultLimitSpec.sortingOrderHasNonGroupingFields(
limitSpec,
query.getDimensions()
)
);
if (limitGrouper.validateBufferCapacity(buffer.capacity())) {
grouper = limitGrouper;
} else {
LOGGER.warn(
"Limit is not applied in segment scan phase due to limited buffer capacity for query [%s].",
query.getId()
);
}
}
if (grouper == null) {
grouper = new BufferHashGrouper<>(
Suppliers.ofInstance(buffer),
keySerde,
AggregatorAdapters.factorizeBuffered(
cursor.getColumnSelectorFactory(),
query.getAggregatorSpecs()
),
querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets(),
true
);
}
return grouper;
}
@Override
@ -808,14 +849,19 @@ public class GroupByQueryEngineV2
private static class GroupByEngineKeySerde implements Grouper.KeySerde<ByteBuffer>
{
private final int keySize;
private final GroupByColumnSelectorPlus[] dims;
private final GroupByQuery query;
public GroupByEngineKeySerde(final GroupByColumnSelectorPlus[] dims)
public GroupByEngineKeySerde(final GroupByColumnSelectorPlus[] dims, GroupByQuery query)
{
this.dims = dims;
int keySize = 0;
for (GroupByColumnSelectorPlus selectorPlus : dims) {
keySize += selectorPlus.getColumnSelectorStrategy().getGroupingKeySize();
}
this.keySize = keySize;
this.query = query;
}
@Override
@ -853,8 +899,15 @@ public class GroupByQueryEngineV2
@Override
public Grouper.BufferComparator bufferComparator()
{
// No sorting, let mergeRunners handle that
throw new UnsupportedOperationException();
Preconditions.checkState(query.isApplyLimitPushDown(), "no limit push down");
DefaultLimitSpec limitSpec = (DefaultLimitSpec) query.getLimitSpec();
return GrouperBufferComparatorUtils.bufferComparator(
query.getResultRowHasTimestamp(),
query.getContextSortByDimsFirst(),
query.getDimensions().size(),
getDimensionComparators(limitSpec)
);
}
@Override
@ -863,8 +916,34 @@ public class GroupByQueryEngineV2
int[] aggregatorOffsets
)
{
// not called on this
throw new UnsupportedOperationException();
Preconditions.checkState(query.isApplyLimitPushDown(), "no limit push down");
DefaultLimitSpec limitSpec = (DefaultLimitSpec) query.getLimitSpec();
return GrouperBufferComparatorUtils.bufferComparatorWithAggregators(
query.getAggregatorSpecs().toArray(new AggregatorFactory[0]),
aggregatorOffsets,
limitSpec,
query.getDimensions(),
getDimensionComparators(limitSpec),
query.getResultRowHasTimestamp(),
query.getContextSortByDimsFirst()
);
}
private Grouper.BufferComparator[] getDimensionComparators(DefaultLimitSpec limitSpec)
{
Grouper.BufferComparator[] dimComparators = new Grouper.BufferComparator[dims.length];
for (int i = 0; i < dims.length; i++) {
final String dimName = query.getDimensions().get(i).getOutputName();
StringComparator stringComparator = DefaultLimitSpec.getComparatorForDimName(limitSpec, dimName);
dimComparators[i] = dims[i].getColumnSelectorStrategy().bufferComparator(
dims[i].getKeyBufferPosition(),
stringComparator
);
}
return dimComparators;
}
@Override

View File

@ -0,0 +1,436 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.primitives.Longs;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Set of utility methods to faciliate implementation of {@link Grouper.KeySerde#bufferComparator()} and
* {@link Grouper.KeySerde#bufferComparatorWithAggregators(AggregatorFactory[], int[])}
*/
public class GrouperBufferComparatorUtils
{
public static Grouper.BufferComparator bufferComparator(
boolean includeTimestamp,
boolean sortByDimsFirst,
int dimCount,
Grouper.BufferComparator[] serdeHelperComparators
)
{
if (includeTimestamp) {
if (sortByDimsFirst) {
return new Grouper.BufferComparator()
{
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
final int cmp = compareDimsInBuffersForNullFudgeTimestamp(
serdeHelperComparators,
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
if (cmp != 0) {
return cmp;
}
return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
}
};
} else {
return new Grouper.BufferComparator()
{
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
if (timeCompare != 0) {
return timeCompare;
}
return compareDimsInBuffersForNullFudgeTimestamp(
serdeHelperComparators,
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
}
};
}
} else {
return new Grouper.BufferComparator()
{
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
for (int i = 0; i < dimCount; i++) {
final int cmp = serdeHelperComparators[i].compare(
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
};
}
}
public static Grouper.BufferComparator bufferComparatorWithAggregators(
AggregatorFactory[] aggregatorFactories,
int[] aggregatorOffsets,
DefaultLimitSpec limitSpec,
List<DimensionSpec> dimensions,
Grouper.BufferComparator[] dimComparators,
boolean includeTimestamp,
boolean sortByDimsFirst
)
{
int dimCount = dimensions.size();
final List<Boolean> needsReverses = new ArrayList<>();
List<Grouper.BufferComparator> comparators = new ArrayList<>();
Set<Integer> orderByIndices = new HashSet<>();
int aggCount = 0;
boolean needsReverse;
for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) {
needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING;
int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions);
if (dimIndex >= 0) {
comparators.add(dimComparators[dimIndex]);
orderByIndices.add(dimIndex);
needsReverses.add(needsReverse);
} else {
int aggIndex = OrderByColumnSpec.getAggIndexForOrderBy(orderSpec, Arrays.asList(aggregatorFactories));
if (aggIndex >= 0) {
final StringComparator stringComparator = orderSpec.getDimensionComparator();
final String typeName = aggregatorFactories[aggIndex].getTypeName();
final int aggOffset = aggregatorOffsets[aggIndex] - Integer.BYTES;
aggCount++;
final ValueType valueType = ValueType.fromString(typeName);
if (!ValueType.isNumeric(valueType)) {
throw new IAE("Cannot order by a non-numeric aggregator[%s]", orderSpec);
}
comparators.add(makeNullHandlingBufferComparatorForNumericData(
aggOffset,
makeNumericBufferComparator(valueType, aggOffset, true, stringComparator)
));
needsReverses.add(needsReverse);
}
}
}
for (int i = 0; i < dimCount; i++) {
if (!orderByIndices.contains(i)) {
comparators.add(dimComparators[i]);
needsReverses.add(false); // default to Ascending order if dim is not in an orderby spec
}
}
final Grouper.BufferComparator[] adjustedSerdeHelperComparators = comparators.toArray(new Grouper.BufferComparator[0]);
final int fieldCount = dimCount + aggCount;
if (includeTimestamp) {
if (sortByDimsFirst) {
return new Grouper.BufferComparator()
{
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
final int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
adjustedSerdeHelperComparators,
needsReverses,
fieldCount,
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
if (cmp != 0) {
return cmp;
}
return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
}
};
} else {
return new Grouper.BufferComparator()
{
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
if (timeCompare != 0) {
return timeCompare;
}
int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
adjustedSerdeHelperComparators,
needsReverses,
fieldCount,
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
return cmp;
}
};
}
} else {
return new Grouper.BufferComparator()
{
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
for (int i = 0; i < fieldCount; i++) {
final int cmp;
if (needsReverses.get(i)) {
cmp = adjustedSerdeHelperComparators[i].compare(
rhsBuffer,
lhsBuffer,
rhsPosition,
lhsPosition
);
} else {
cmp = adjustedSerdeHelperComparators[i].compare(
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
}
if (cmp != 0) {
return cmp;
}
}
return 0;
}
};
}
}
private static int compareDimsInBuffersForNullFudgeTimestampForPushDown(
Grouper.BufferComparator[] serdeHelperComparators,
List<Boolean> needsReverses,
int dimCount,
ByteBuffer lhsBuffer,
ByteBuffer rhsBuffer,
int lhsPosition,
int rhsPosition
)
{
for (int i = 0; i < dimCount; i++) {
final int cmp;
if (needsReverses.get(i)) {
cmp = serdeHelperComparators[i].compare(
rhsBuffer,
lhsBuffer,
rhsPosition + Long.BYTES,
lhsPosition + Long.BYTES
);
} else {
cmp = serdeHelperComparators[i].compare(
lhsBuffer,
rhsBuffer,
lhsPosition + Long.BYTES,
rhsPosition + Long.BYTES
);
}
if (cmp != 0) {
return cmp;
}
}
return 0;
}
private static int compareDimsInBuffersForNullFudgeTimestamp(
Grouper.BufferComparator[] serdeHelperComparators,
ByteBuffer lhsBuffer,
ByteBuffer rhsBuffer,
int lhsPosition,
int rhsPosition
)
{
for (Grouper.BufferComparator comparator : serdeHelperComparators) {
final int cmp = comparator.compare(
lhsBuffer,
rhsBuffer,
lhsPosition + Long.BYTES,
rhsPosition + Long.BYTES
);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
private static Grouper.BufferComparator makeNumericBufferComparator(
ValueType valueType,
int keyBufferPosition,
boolean pushLimitDown,
@Nullable StringComparator stringComparator
)
{
switch (valueType) {
case LONG:
return makeBufferComparatorForLong(keyBufferPosition, pushLimitDown, stringComparator);
case FLOAT:
return makeBufferComparatorForFloat(keyBufferPosition, pushLimitDown, stringComparator);
case DOUBLE:
return makeBufferComparatorForDouble(keyBufferPosition, pushLimitDown, stringComparator);
default:
throw new IAE("invalid type: %s", valueType);
}
}
public static Grouper.BufferComparator makeBufferComparatorForLong(
int keyBufferPosition,
boolean pushLimitDown,
@Nullable StringComparator stringComparator
)
{
if (isPrimitiveComparable(pushLimitDown, stringComparator)) {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Longs.compare(
lhsBuffer.getLong(lhsPosition + keyBufferPosition),
rhsBuffer.getLong(rhsPosition + keyBufferPosition)
);
} else {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
long lhs = lhsBuffer.getLong(lhsPosition + keyBufferPosition);
long rhs = rhsBuffer.getLong(rhsPosition + keyBufferPosition);
return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs));
};
}
}
public static Grouper.BufferComparator makeBufferComparatorForDouble(
int keyBufferPosition,
boolean pushLimitDown,
@Nullable StringComparator stringComparator
)
{
if (isPrimitiveComparable(pushLimitDown, stringComparator)) {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Double.compare(
lhsBuffer.getDouble(lhsPosition + keyBufferPosition),
rhsBuffer.getDouble(rhsPosition + keyBufferPosition)
);
} else {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
double lhs = lhsBuffer.getDouble(lhsPosition + keyBufferPosition);
double rhs = rhsBuffer.getDouble(rhsPosition + keyBufferPosition);
return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs));
};
}
}
public static Grouper.BufferComparator makeBufferComparatorForFloat(
int keyBufferPosition,
boolean pushLimitDown,
@Nullable StringComparator stringComparator
)
{
if (isPrimitiveComparable(pushLimitDown, stringComparator)) {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Float.compare(
lhsBuffer.getFloat(lhsPosition + keyBufferPosition),
rhsBuffer.getFloat(rhsPosition + keyBufferPosition)
);
} else {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
float lhs = lhsBuffer.getFloat(lhsPosition + keyBufferPosition);
float rhs = rhsBuffer.getFloat(rhsPosition + keyBufferPosition);
return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs));
};
}
}
public static Grouper.BufferComparator makeNullHandlingBufferComparatorForNumericData(
int keyBufferPosition,
Grouper.BufferComparator delegate
)
{
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
boolean isLhsNull = (lhsBuffer.get(lhsPosition + keyBufferPosition) == NullHandling.IS_NULL_BYTE);
boolean isRhsNull = (rhsBuffer.get(rhsPosition + keyBufferPosition) == NullHandling.IS_NULL_BYTE);
if (isLhsNull && isRhsNull) {
// Both are null
return 0;
}
// only lhs is null
if (isLhsNull) {
return -1;
}
// only rhs is null
if (isRhsNull) {
return 1;
}
return delegate.compare(
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
};
}
private static boolean isPrimitiveComparable(boolean pushLimitDown, @Nullable StringComparator stringComparator)
{
return !pushLimitDown || stringComparator == null || stringComparator.equals(StringComparators.NUMERIC);
}
}

View File

@ -1171,57 +1171,12 @@ public class RowBasedGrouperHelper
initializeRankOfDictionaryIds();
}
if (includeTimestamp) {
if (sortByDimsFirst) {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
final int cmp = compareDimsInBuffersForNullFudgeTimestamp(
serdeHelperComparators,
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
if (cmp != 0) {
return cmp;
}
return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
};
} else {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
if (timeCompare != 0) {
return timeCompare;
}
return compareDimsInBuffersForNullFudgeTimestamp(
serdeHelperComparators,
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
};
}
} else {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
for (int i = 0; i < dimCount; i++) {
final int cmp = serdeHelperComparators[i].compare(
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
if (cmp != 0) {
return cmp;
}
}
return 0;
};
}
return GrouperBufferComparatorUtils.bufferComparator(
includeTimestamp,
sortByDimsFirst,
dimCount,
serdeHelperComparators
);
}
@Override
@ -1230,127 +1185,15 @@ public class RowBasedGrouperHelper
int[] aggregatorOffsets
)
{
final List<RowBasedKeySerdeHelper> adjustedSerdeHelpers;
final List<Boolean> needsReverses = new ArrayList<>();
List<RowBasedKeySerdeHelper> orderByHelpers = new ArrayList<>();
List<RowBasedKeySerdeHelper> otherDimHelpers = new ArrayList<>();
Set<Integer> orderByIndices = new HashSet<>();
int aggCount = 0;
boolean needsReverse;
for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) {
needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING;
int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions);
if (dimIndex >= 0) {
RowBasedKeySerdeHelper serdeHelper = serdeHelpers[dimIndex];
orderByHelpers.add(serdeHelper);
orderByIndices.add(dimIndex);
needsReverses.add(needsReverse);
} else {
int aggIndex = OrderByColumnSpec.getAggIndexForOrderBy(orderSpec, Arrays.asList(aggregatorFactories));
if (aggIndex >= 0) {
final RowBasedKeySerdeHelper serdeHelper;
final StringComparator stringComparator = orderSpec.getDimensionComparator();
final String typeName = aggregatorFactories[aggIndex].getTypeName();
final int aggOffset = aggregatorOffsets[aggIndex] - Integer.BYTES;
aggCount++;
final ValueType valueType = ValueType.fromString(typeName);
if (!ValueType.isNumeric(valueType)) {
throw new IAE("Cannot order by a non-numeric aggregator[%s]", orderSpec);
}
serdeHelper = makeNullHandlingNumericserdeHelper(valueType, aggOffset, true, stringComparator);
orderByHelpers.add(serdeHelper);
needsReverses.add(needsReverse);
}
}
}
for (int i = 0; i < dimCount; i++) {
if (!orderByIndices.contains(i)) {
otherDimHelpers.add(serdeHelpers[i]);
needsReverses.add(false); // default to Ascending order if dim is not in an orderby spec
}
}
adjustedSerdeHelpers = orderByHelpers;
adjustedSerdeHelpers.addAll(otherDimHelpers);
final BufferComparator[] adjustedSerdeHelperComparators = new BufferComparator[adjustedSerdeHelpers.size()];
Arrays.setAll(adjustedSerdeHelperComparators, i -> adjustedSerdeHelpers.get(i).getBufferComparator());
final int fieldCount = dimCount + aggCount;
if (includeTimestamp) {
if (sortByDimsFirst) {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
final int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
adjustedSerdeHelperComparators,
needsReverses,
fieldCount,
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
if (cmp != 0) {
return cmp;
}
return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
};
} else {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
if (timeCompare != 0) {
return timeCompare;
}
int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
adjustedSerdeHelperComparators,
needsReverses,
fieldCount,
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
return cmp;
};
}
} else {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
for (int i = 0; i < fieldCount; i++) {
final int cmp;
if (needsReverses.get(i)) {
cmp = adjustedSerdeHelperComparators[i].compare(
rhsBuffer,
lhsBuffer,
rhsPosition,
lhsPosition
);
} else {
cmp = adjustedSerdeHelperComparators[i].compare(
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
}
if (cmp != 0) {
return cmp;
}
}
return 0;
};
}
return GrouperBufferComparatorUtils.bufferComparatorWithAggregators(
aggregatorFactories,
aggregatorOffsets,
limitSpec,
dimensions,
serdeHelperComparators,
includeTimestamp,
sortByDimsFirst
);
}
@Override
@ -1478,11 +1321,6 @@ public class RowBasedGrouperHelper
}
}
private static boolean isPrimitiveComparable(boolean pushLimitDown, @Nullable StringComparator stringComparator)
{
return !pushLimitDown || stringComparator == null || stringComparator.equals(StringComparators.NUMERIC);
}
private abstract class AbstractStringRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
{
final int keyBufferPosition;
@ -1617,19 +1455,11 @@ public class RowBasedGrouperHelper
)
{
this.keyBufferPosition = keyBufferPosition;
if (isPrimitiveComparable(pushLimitDown, stringComparator)) {
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Longs.compare(
lhsBuffer.getLong(lhsPosition + keyBufferPosition),
rhsBuffer.getLong(rhsPosition + keyBufferPosition)
);
} else {
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
long lhs = lhsBuffer.getLong(lhsPosition + keyBufferPosition);
long rhs = rhsBuffer.getLong(rhsPosition + keyBufferPosition);
return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs));
};
}
bufferComparator = GrouperBufferComparatorUtils.makeBufferComparatorForLong(
keyBufferPosition,
pushLimitDown,
stringComparator
);
}
@Override
@ -1670,18 +1500,11 @@ public class RowBasedGrouperHelper
)
{
this.keyBufferPosition = keyBufferPosition;
if (isPrimitiveComparable(pushLimitDown, stringComparator)) {
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Float.compare(
lhsBuffer.getFloat(lhsPosition + keyBufferPosition),
rhsBuffer.getFloat(rhsPosition + keyBufferPosition)
);
} else {
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
float lhs = lhsBuffer.getFloat(lhsPosition + keyBufferPosition);
float rhs = rhsBuffer.getFloat(rhsPosition + keyBufferPosition);
return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs));
};
}
bufferComparator = GrouperBufferComparatorUtils.makeBufferComparatorForFloat(
keyBufferPosition,
pushLimitDown,
stringComparator
);
}
@Override
@ -1722,18 +1545,11 @@ public class RowBasedGrouperHelper
)
{
this.keyBufferPosition = keyBufferPosition;
if (isPrimitiveComparable(pushLimitDown, stringComparator)) {
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Double.compare(
lhsBuffer.getDouble(lhsPosition + keyBufferPosition),
rhsBuffer.getDouble(rhsPosition + keyBufferPosition)
);
} else {
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
double lhs = lhsBuffer.getDouble(lhsPosition + keyBufferPosition);
double rhs = rhsBuffer.getDouble(rhsPosition + keyBufferPosition);
return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs));
};
}
bufferComparator = GrouperBufferComparatorUtils.makeBufferComparatorForDouble(
keyBufferPosition,
pushLimitDown,
stringComparator
);
}
@Override
@ -1776,29 +1592,10 @@ public class RowBasedGrouperHelper
{
this.delegate = delegate;
this.keyBufferPosition = keyBufferPosition;
BufferComparator delegateBufferComparator = this.delegate.getBufferComparator();
this.comparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
boolean isLhsNull = (lhsBuffer.get(lhsPosition + keyBufferPosition) == NullHandling.IS_NULL_BYTE);
boolean isRhsNull = (rhsBuffer.get(rhsPosition + keyBufferPosition) == NullHandling.IS_NULL_BYTE);
if (isLhsNull && isRhsNull) {
// Both are null
return 0;
}
// only lhs is null
if (isLhsNull) {
return -1;
}
// only rhs is null
if (isRhsNull) {
return 1;
}
return delegateBufferComparator.compare(
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
};
this.comparator = GrouperBufferComparatorUtils.makeNullHandlingBufferComparatorForNumericData(
keyBufferPosition,
this.delegate.getBufferComparator()
);
}
@Override
@ -1837,62 +1634,4 @@ public class RowBasedGrouperHelper
}
}
}
private static int compareDimsInBuffersForNullFudgeTimestamp(
BufferComparator[] serdeHelperComparators,
ByteBuffer lhsBuffer,
ByteBuffer rhsBuffer,
int lhsPosition,
int rhsPosition
)
{
for (BufferComparator comparator : serdeHelperComparators) {
final int cmp = comparator.compare(
lhsBuffer,
rhsBuffer,
lhsPosition + Long.BYTES,
rhsPosition + Long.BYTES
);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
private static int compareDimsInBuffersForNullFudgeTimestampForPushDown(
BufferComparator[] serdeHelperComparators,
List<Boolean> needsReverses,
int dimCount,
ByteBuffer lhsBuffer,
ByteBuffer rhsBuffer,
int lhsPosition,
int rhsPosition
)
{
for (int i = 0; i < dimCount; i++) {
final int cmp;
if (needsReverses.get(i)) {
cmp = serdeHelperComparators[i].compare(
rhsBuffer,
lhsBuffer,
rhsPosition + Long.BYTES,
lhsPosition + Long.BYTES
);
} else {
cmp = serdeHelperComparators[i].compare(
lhsBuffer,
rhsBuffer,
lhsPosition + Long.BYTES,
rhsPosition + Long.BYTES
);
}
if (cmp != 0) {
return cmp;
}
}
return 0;
}
}

View File

@ -23,11 +23,15 @@ import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.Grouper;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.data.ArrayBasedIndexedInts;
import org.apache.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -48,6 +52,11 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin
reverseDictionary.defaultReturnValue(-1);
}
public DictionaryBuildingStringGroupByColumnSelectorStrategy()
{
super(null);
}
@Override
public void processValueFromGroupingKey(
GroupByColumnSelectorPlus selectorPlus,
@ -116,4 +125,17 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin
return dictId;
}
}
@Override
public Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator)
{
final StringComparator realComparator = stringComparator == null ?
StringComparators.LEXICOGRAPHIC :
stringComparator;
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
String lhsStr = dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition));
String rhsStr = dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition));
return realComparator.compare(lhsStr, rhsStr);
};
}
}

View File

@ -19,8 +19,10 @@
package org.apache.druid.query.groupby.epinephelinae.column;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.Grouper;
import org.apache.druid.query.groupby.epinephelinae.GrouperBufferComparatorUtils;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
@ -90,4 +92,14 @@ public class DoubleGroupByColumnSelectorStrategy implements GroupByColumnSelecto
// this method handles row values after the first in a multivalued row, so just return false
return false;
}
@Override
public Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator)
{
return GrouperBufferComparatorUtils.makeBufferComparatorForDouble(
keyBufferPosition,
true,
stringComparator
);
}
}

View File

@ -20,6 +20,9 @@
package org.apache.druid.query.groupby.epinephelinae.column;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.Grouper;
import org.apache.druid.query.groupby.epinephelinae.GrouperBufferComparatorUtils;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
@ -65,6 +68,19 @@ public class FloatGroupByColumnSelectorStrategy implements GroupByColumnSelector
keyBuffer.putFloat(keyBufferPosition, DimensionHandlerUtils.nullToZero((Float) obj));
}
@Override
public Grouper.BufferComparator bufferComparator(
int keyBufferPosition,
@Nullable StringComparator stringComparator
)
{
return GrouperBufferComparatorUtils.makeBufferComparatorForFloat(
keyBufferPosition,
true,
stringComparator
);
}
@Override
public void initGroupingKeyColumnValue(
int keyBufferPosition,

View File

@ -21,8 +21,11 @@ package org.apache.druid.query.groupby.epinephelinae.column;
import org.apache.druid.query.dimension.ColumnSelectorStrategy;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.Grouper;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
/**
@ -136,4 +139,12 @@ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy
* @param keyBuffer grouping key
*/
void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer);
/**
* Return BufferComparator for values written using this strategy when limit is pushed down to segment scan.
* @param keyBufferPosition starting offset for this column's value within the grouping key
* @param stringComparator stringComparator from LimitSpec for this column
* @return BufferComparator for comparing values written
*/
Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator);
}

View File

@ -20,6 +20,9 @@
package org.apache.druid.query.groupby.epinephelinae.column;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.Grouper;
import org.apache.druid.query.groupby.epinephelinae.GrouperBufferComparatorUtils;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
@ -65,6 +68,19 @@ public class LongGroupByColumnSelectorStrategy implements GroupByColumnSelectorS
keyBuffer.putLong(keyBufferPosition, DimensionHandlerUtils.nullToZero((Long) obj));
}
@Override
public Grouper.BufferComparator bufferComparator(
int keyBufferPosition,
@Nullable StringComparator stringComparator
)
{
return GrouperBufferComparatorUtils.makeBufferComparatorForLong(
keyBufferPosition,
true,
stringComparator
);
}
@Override
public void initGroupingKeyColumnValue(
int keyBufferPosition,

View File

@ -19,9 +19,11 @@
package org.apache.druid.query.groupby.epinephelinae.column;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.Grouper;
import org.apache.druid.query.groupby.epinephelinae.GrouperBufferComparatorUtils;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nullable;
@ -88,6 +90,18 @@ public class NullableValueGroupByColumnSelectorStrategy implements GroupByColumn
delegate.writeToKeyBuffer(keyBufferPosition + Byte.BYTES, obj, keyBuffer);
}
@Override
public Grouper.BufferComparator bufferComparator(
int keyBufferPosition,
@Nullable StringComparator stringComparator
)
{
return GrouperBufferComparatorUtils.makeNullHandlingBufferComparatorForNumericData(
keyBufferPosition,
delegate.bufferComparator(keyBufferPosition + Byte.BYTES, stringComparator)
);
}
@Override
public void initGroupingKeyColumnValue(
int keyBufferPosition,

View File

@ -22,14 +22,27 @@ package org.apache.druid.query.groupby.epinephelinae.column;
import com.google.common.base.Preconditions;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.Grouper;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.function.IntFunction;
public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy
{
@Nullable
private final IntFunction<String> dictionaryLookup;
public StringGroupByColumnSelectorStrategy(IntFunction<String> dictionaryLookup)
{
this.dictionaryLookup = dictionaryLookup;
}
@Override
public int getGroupingKeySize()
{
@ -131,4 +144,22 @@ public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelecto
keyBuffer.putInt(keyBufferPosition, values.get(0));
}
}
@Override
public Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator)
{
if (stringComparator == null || StringComparators.LEXICOGRAPHIC.equals(stringComparator)) {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Integer.compare(
lhsBuffer.getInt(lhsPosition + keyBufferPosition),
rhsBuffer.getInt(rhsPosition + keyBufferPosition)
);
} else {
Preconditions.checkState(dictionaryLookup != null, "null dictionary lookup");
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
String lhsStr = dictionaryLookup.apply(lhsBuffer.getInt(lhsPosition + keyBufferPosition));
String rhsStr = dictionaryLookup.apply(rhsBuffer.getInt(rhsPosition + keyBufferPosition));
return stringComparator.compare(lhsStr, rhsStr);
};
}
}
}

View File

@ -94,7 +94,8 @@ public class GroupByQueryConfigTest
"groupByStrategy", "v1",
"maxOnDiskStorage", 0,
"maxResults", 2,
"maxMergingDictionarySize", 3
"maxMergingDictionarySize", 3,
"applyLimitPushDownToSegment", false
)
)
.build()
@ -108,5 +109,6 @@ public class GroupByQueryConfigTest
Assert.assertEquals(0, config2.getMaxOnDiskStorage());
Assert.assertEquals(3, config2.getMaxMergingDictionarySize());
Assert.assertEquals(6.0, config2.getBufferGrouperMaxLoadFactor(), 0.0);
Assert.assertFalse(config2.isApplyLimitPushDownToSegment());
}
}