MSQ: Add limitHint to global-sort shuffles. (#16911)

* MSQ: Add limitHint to global-sort shuffles.

This allows pushing down limits into the SuperSorter.

* Test fixes.

* Add limitSpec to ScanQueryKit. Fix SuperSorter tracking.
This commit is contained in:
Gian Merlino 2024-09-03 09:05:29 -07:00 committed by GitHub
parent 70bad948e3
commit 786c959e9e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 280 additions and 76 deletions

View File

@ -46,6 +46,7 @@
<Or> <Or>
<Class name="org.apache.druid.jackson.DefaultTrueJsonIncludeFilter"/> <Class name="org.apache.druid.jackson.DefaultTrueJsonIncludeFilter"/>
<Class name="org.apache.druid.java.util.common.StringEncodingDefaultUTF16LEJsonIncludeFilter"/> <Class name="org.apache.druid.java.util.common.StringEncodingDefaultUTF16LEJsonIncludeFilter"/>
<Class name="org.apache.druid.msq.kernel.LimitHintJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$ScanRowsLimitJsonIncludeFilter"/> <Class name="org.apache.druid.query.scan.ScanQuery$ScanRowsLimitJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$ScanTimeOrderJsonIncludeFilter"/> <Class name="org.apache.druid.query.scan.ScanQuery$ScanTimeOrderJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$BatchSizeJsonIncludeFilter"/> <Class name="org.apache.druid.query.scan.ScanQuery$BatchSizeJsonIncludeFilter"/>

View File

@ -738,7 +738,7 @@ public class RunWorkOrder
makeSuperSorterIntermediateOutputChannelFactory(sorterTmpDir), makeSuperSorterIntermediateOutputChannelFactory(sorterTmpDir),
memoryParameters.getSuperSorterMaxActiveProcessors(), memoryParameters.getSuperSorterMaxActiveProcessors(),
memoryParameters.getSuperSorterMaxChannelsPerProcessor(), memoryParameters.getSuperSorterMaxChannelsPerProcessor(),
-1, stageDefinition.getShuffleSpec().limitHint(),
cancellationId, cancellationId,
counterTracker.sortProgress(), counterTracker.sortProgress(),
removeNullBytes removeNullBytes
@ -871,7 +871,7 @@ public class RunWorkOrder
makeSuperSorterIntermediateOutputChannelFactory(sorterTmpDir), makeSuperSorterIntermediateOutputChannelFactory(sorterTmpDir),
1, 1,
2, 2,
-1, ShuffleSpec.UNLIMITED,
cancellationId, cancellationId,
// Tracker is not actually tracked, since it doesn't quite fit into the way we report counters. // Tracker is not actually tracked, since it doesn't quite fit into the way we report counters.

View File

@ -43,17 +43,20 @@ public class GlobalSortMaxCountShuffleSpec implements GlobalSortShuffleSpec
private final ClusterBy clusterBy; private final ClusterBy clusterBy;
private final int maxPartitions; private final int maxPartitions;
private final boolean aggregate; private final boolean aggregate;
private final long limitHint;
@JsonCreator @JsonCreator
public GlobalSortMaxCountShuffleSpec( public GlobalSortMaxCountShuffleSpec(
@JsonProperty("clusterBy") final ClusterBy clusterBy, @JsonProperty("clusterBy") final ClusterBy clusterBy,
@JsonProperty("partitions") final int maxPartitions, @JsonProperty("partitions") final int maxPartitions,
@JsonProperty("aggregate") final boolean aggregate @JsonProperty("aggregate") final boolean aggregate,
@JsonProperty("limitHint") final Long limitHint
) )
{ {
this.clusterBy = Preconditions.checkNotNull(clusterBy, "clusterBy"); this.clusterBy = Preconditions.checkNotNull(clusterBy, "clusterBy");
this.maxPartitions = maxPartitions; this.maxPartitions = maxPartitions;
this.aggregate = aggregate; this.aggregate = aggregate;
this.limitHint = limitHint == null ? UNLIMITED : limitHint;
if (maxPartitions < 1) { if (maxPartitions < 1) {
throw new IAE("Partition count must be at least 1"); throw new IAE("Partition count must be at least 1");
@ -133,6 +136,14 @@ public class GlobalSortMaxCountShuffleSpec implements GlobalSortShuffleSpec
return maxPartitions; return maxPartitions;
} }
@Override
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = LimitHintJsonIncludeFilter.class)
@JsonProperty
public long limitHint()
{
return limitHint;
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
@ -145,22 +156,24 @@ public class GlobalSortMaxCountShuffleSpec implements GlobalSortShuffleSpec
GlobalSortMaxCountShuffleSpec that = (GlobalSortMaxCountShuffleSpec) o; GlobalSortMaxCountShuffleSpec that = (GlobalSortMaxCountShuffleSpec) o;
return maxPartitions == that.maxPartitions return maxPartitions == that.maxPartitions
&& aggregate == that.aggregate && aggregate == that.aggregate
&& Objects.equals(clusterBy, that.clusterBy); && Objects.equals(clusterBy, that.clusterBy)
&& Objects.equals(limitHint, that.limitHint);
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash(clusterBy, maxPartitions, aggregate); return Objects.hash(clusterBy, maxPartitions, aggregate, limitHint);
} }
@Override @Override
public String toString() public String toString()
{ {
return "MaxCountShuffleSpec{" + return "GlobalSortMaxCountShuffleSpec{" +
"clusterBy=" + clusterBy + "clusterBy=" + clusterBy +
", partitions=" + maxPartitions + ", maxPartitions=" + maxPartitions +
", aggregate=" + aggregate + ", aggregate=" + aggregate +
", limitHint=" + limitHint +
'}'; '}';
} }
} }

View File

@ -65,5 +65,4 @@ public class HashShuffleSpec implements ShuffleSpec
{ {
return numPartitions; return numPartitions;
} }
} }

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.kernel;
import com.fasterxml.jackson.annotation.JsonInclude;
/**
* {@link JsonInclude} filter for {@link ShuffleSpec#limitHint()}.
*
* This API works by "creative" use of equals. It requires warnings to be suppressed
* and also requires spotbugs exclusions (see spotbugs-exclude.xml).
*/
@SuppressWarnings({"EqualsAndHashcode", "EqualsHashCode"})
public class LimitHintJsonIncludeFilter
{
@Override
public boolean equals(Object obj)
{
return obj instanceof Long && (Long) obj == ShuffleSpec.UNLIMITED;
}
}

View File

@ -37,6 +37,8 @@ import org.apache.druid.frame.key.ClusterBy;
}) })
public interface ShuffleSpec public interface ShuffleSpec
{ {
long UNLIMITED = -1;
/** /**
* The nature of this shuffle: hash vs. range based partitioning; whether the data are sorted or not. * The nature of this shuffle: hash vs. range based partitioning; whether the data are sorted or not.
* *
@ -68,4 +70,17 @@ public interface ShuffleSpec
* @throws IllegalStateException if kind is {@link ShuffleKind#GLOBAL_SORT} with more than one target partition * @throws IllegalStateException if kind is {@link ShuffleKind#GLOBAL_SORT} with more than one target partition
*/ */
int partitionCount(); int partitionCount();
/**
* Limit that can be applied during shuffling. This is provided to enable performance optimizations.
*
* Implementations may apply this limit to each partition individually, or may apply it to the entire resultset
* (across all partitions). Either approach is valid, so downstream logic must handle either one.
*
* Implementations may also ignore this hint completely, or may apply a limit that is somewhat higher than this hint.
*/
default long limitHint()
{
return UNLIMITED;
}
} }

View File

@ -22,6 +22,7 @@ package org.apache.druid.msq.querykit;
import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec; import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec; import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec;
import org.apache.druid.msq.kernel.MixShuffleSpec; import org.apache.druid.msq.kernel.MixShuffleSpec;
import org.apache.druid.msq.kernel.ShuffleSpec;
/** /**
* Static factory methods for common implementations of {@link ShuffleSpecFactory}. * Static factory methods for common implementations of {@link ShuffleSpecFactory}.
@ -37,10 +38,21 @@ public class ShuffleSpecFactories
* Factory that produces a single output partition, which may or may not be sorted. * Factory that produces a single output partition, which may or may not be sorted.
*/ */
public static ShuffleSpecFactory singlePartition() public static ShuffleSpecFactory singlePartition()
{
return singlePartitionWithLimit(ShuffleSpec.UNLIMITED);
}
/**
* Factory that produces a single output partition, which may or may not be sorted.
*
* @param limitHint limit that can be applied during shuffling. May not actually be applied; this is just an
* optional optimization. See {@link ShuffleSpec#limitHint()}.
*/
public static ShuffleSpecFactory singlePartitionWithLimit(final long limitHint)
{ {
return (clusterBy, aggregate) -> { return (clusterBy, aggregate) -> {
if (clusterBy.sortable() && !clusterBy.isEmpty()) { if (clusterBy.sortable() && !clusterBy.isEmpty()) {
return new GlobalSortMaxCountShuffleSpec(clusterBy, 1, aggregate); return new GlobalSortMaxCountShuffleSpec(clusterBy, 1, aggregate, limitHint);
} else { } else {
return MixShuffleSpec.instance(); return MixShuffleSpec.instance();
} }
@ -52,7 +64,8 @@ public class ShuffleSpecFactories
*/ */
public static ShuffleSpecFactory globalSortWithMaxPartitionCount(final int partitions) public static ShuffleSpecFactory globalSortWithMaxPartitionCount(final int partitions)
{ {
return (clusterBy, aggregate) -> new GlobalSortMaxCountShuffleSpec(clusterBy, partitions, aggregate); return (clusterBy, aggregate) ->
new GlobalSortMaxCountShuffleSpec(clusterBy, partitions, aggregate, ShuffleSpec.UNLIMITED);
} }
/** /**
@ -61,10 +74,6 @@ public class ShuffleSpecFactories
public static ShuffleSpecFactory getGlobalSortWithTargetSize(int targetSize) public static ShuffleSpecFactory getGlobalSortWithTargetSize(int targetSize)
{ {
return (clusterBy, aggregate) -> return (clusterBy, aggregate) ->
new GlobalSortTargetSizeShuffleSpec( new GlobalSortTargetSizeShuffleSpec(clusterBy, targetSize, aggregate);
clusterBy,
targetSize,
aggregate
);
} }
} }

View File

@ -112,13 +112,25 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
final ShuffleSpecFactory shuffleSpecFactoryPostAggregation; final ShuffleSpecFactory shuffleSpecFactoryPostAggregation;
boolean partitionBoost; boolean partitionBoost;
// limitHint to use for the shuffle after the post-aggregation stage.
// Don't apply limitHint pre-aggregation, because results from pre-aggregation may not be fully grouped.
final long postAggregationLimitHint;
if (doLimitOrOffset) {
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
postAggregationLimitHint =
limitSpec.isLimited() ? limitSpec.getOffset() + limitSpec.getLimit() : ShuffleSpec.UNLIMITED;
} else {
postAggregationLimitHint = ShuffleSpec.UNLIMITED;
}
if (intermediateClusterBy.isEmpty() && resultClusterByWithoutPartitionBoost.isEmpty()) { if (intermediateClusterBy.isEmpty() && resultClusterByWithoutPartitionBoost.isEmpty()) {
// Ignore shuffleSpecFactory, since we know only a single partition will come out, and we can save some effort. // Ignore shuffleSpecFactory, since we know only a single partition will come out, and we can save some effort.
// This condition will be triggered when we don't have a grouping dimension, no partitioning granularity // This condition will be triggered when we don't have a grouping dimension, no partitioning granularity
// (PARTITIONED BY ALL) and no ordering/clustering dimensions // (PARTITIONED BY ALL) and no ordering/clustering dimensions
// For example: INSERT INTO foo SELECT COUNT(*) FROM bar PARTITIONED BY ALL // For example: INSERT INTO foo SELECT COUNT(*) FROM bar PARTITIONED BY ALL
shuffleSpecFactoryPreAggregation = ShuffleSpecFactories.singlePartition(); shuffleSpecFactoryPreAggregation = ShuffleSpecFactories.singlePartition();
shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartition(); shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartitionWithLimit(postAggregationLimitHint);
partitionBoost = false; partitionBoost = false;
} else if (doOrderBy) { } else if (doOrderBy) {
// There can be a situation where intermediateClusterBy is empty, while the resultClusterBy is non-empty // There can be a situation where intermediateClusterBy is empty, while the resultClusterBy is non-empty
@ -130,9 +142,13 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty() shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty()
? ShuffleSpecFactories.singlePartition() ? ShuffleSpecFactories.singlePartition()
: ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount); : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount);
shuffleSpecFactoryPostAggregation = doLimitOrOffset
? ShuffleSpecFactories.singlePartition() if (doLimitOrOffset) {
: resultShuffleSpecFactory; shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartitionWithLimit(postAggregationLimitHint);
} else {
shuffleSpecFactoryPostAggregation = resultShuffleSpecFactory;
}
partitionBoost = true; partitionBoost = true;
} else { } else {
shuffleSpecFactoryPreAggregation = doLimitOrOffset shuffleSpecFactoryPreAggregation = doLimitOrOffset

View File

@ -143,22 +143,32 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
); );
ShuffleSpec scanShuffleSpec; ShuffleSpec scanShuffleSpec;
if (!hasLimitOrOffset) { if (hasLimitOrOffset) {
// If there is no limit spec, apply the final shuffling here itself. This will ensure partition sizes etc are respected.
scanShuffleSpec = finalShuffleSpec;
} else {
// If there is a limit spec, check if there are any non-boost columns to sort in. // If there is a limit spec, check if there are any non-boost columns to sort in.
boolean requiresSort = clusterByColumns.stream() boolean requiresSort =
clusterByColumns.stream()
.anyMatch(keyColumn -> !QueryKitUtils.PARTITION_BOOST_COLUMN.equals(keyColumn.columnName())); .anyMatch(keyColumn -> !QueryKitUtils.PARTITION_BOOST_COLUMN.equals(keyColumn.columnName()));
if (requiresSort) { if (requiresSort) {
// If yes, do a sort into a single partition. // If yes, do a sort into a single partition.
scanShuffleSpec = ShuffleSpecFactories.singlePartition().build(clusterBy, false); final long limitHint;
if (queryToRun.isLimited()
&& queryToRun.getScanRowsOffset() + queryToRun.getScanRowsLimit() > 0 /* overflow check */) {
limitHint = queryToRun.getScanRowsOffset() + queryToRun.getScanRowsLimit();
} else {
limitHint = ShuffleSpec.UNLIMITED;
}
scanShuffleSpec = ShuffleSpecFactories.singlePartitionWithLimit(limitHint).build(clusterBy, false);
} else { } else {
// If the only clusterBy column is the boost column, we just use a mix shuffle to avoid unused shuffling. // If the only clusterBy column is the boost column, we just use a mix shuffle to avoid unused shuffling.
// Note that we still need the boost column to be present in the row signature, since the limit stage would // Note that we still need the boost column to be present in the row signature, since the limit stage would
// need it to be populated to do its own shuffling later. // need it to be populated to do its own shuffling later.
scanShuffleSpec = MixShuffleSpec.instance(); scanShuffleSpec = MixShuffleSpec.instance();
} }
} else {
// If there is no limit spec, apply the final shuffling here itself. This will ensure partition sizes etc are respected.
scanShuffleSpec = finalShuffleSpec;
} }
queryDefBuilder.add( queryDefBuilder.add(

View File

@ -1847,7 +1847,7 @@ public class MSQSelectTest extends MSQTestBase
) )
.setExpectedCountersForStageWorkerChannel( .setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher CounterSnapshotMatcher
.with().rows(5), .with().rows(4),
1, 0, "shuffle" 1, 0, "shuffle"
) )
.setExpectedCountersForStageWorkerChannel( .setExpectedCountersForStageWorkerChannel(
@ -1862,7 +1862,7 @@ public class MSQSelectTest extends MSQTestBase
) )
.setExpectedCountersForStageWorkerChannel( .setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher CounterSnapshotMatcher
.with().rows(5), .with().rows(4),
2, 0, "input0" 2, 0, "input0"
) )
.setExpectedCountersForStageWorkerChannel( .setExpectedCountersForStageWorkerChannel(

View File

@ -37,6 +37,7 @@ import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.TooManyColumnsFault; import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec; import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory; import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
@ -69,7 +70,8 @@ public class MSQTaskReportTest
new GlobalSortMaxCountShuffleSpec( new GlobalSortMaxCountShuffleSpec(
new ClusterBy(ImmutableList.of(new KeyColumn("s", KeyOrder.ASCENDING)), 0), new ClusterBy(ImmutableList.of(new KeyColumn("s", KeyOrder.ASCENDING)), 0),
2, 2,
false false,
ShuffleSpec.UNLIMITED
) )
) )
.maxWorkerCount(3) .maxWorkerCount(3)

View File

@ -51,7 +51,8 @@ public class QueryDefinitionTest
new GlobalSortMaxCountShuffleSpec( new GlobalSortMaxCountShuffleSpec(
new ClusterBy(ImmutableList.of(new KeyColumn("s", KeyOrder.ASCENDING)), 0), new ClusterBy(ImmutableList.of(new KeyColumn("s", KeyOrder.ASCENDING)), 0),
2, 2,
false false,
ShuffleSpec.UNLIMITED
) )
) )
.maxWorkerCount(3) .maxWorkerCount(3)

View File

@ -74,7 +74,8 @@ public class StageDefinitionTest
new GlobalSortMaxCountShuffleSpec( new GlobalSortMaxCountShuffleSpec(
new ClusterBy(ImmutableList.of(new KeyColumn("test", KeyOrder.ASCENDING)), 0), new ClusterBy(ImmutableList.of(new KeyColumn("test", KeyOrder.ASCENDING)), 0),
2, 2,
false false,
ShuffleSpec.UNLIMITED
), ),
1, 1,
false false
@ -95,7 +96,8 @@ public class StageDefinitionTest
new GlobalSortMaxCountShuffleSpec( new GlobalSortMaxCountShuffleSpec(
new ClusterBy(ImmutableList.of(new KeyColumn("test", KeyOrder.ASCENDING)), 0), new ClusterBy(ImmutableList.of(new KeyColumn("test", KeyOrder.ASCENDING)), 0),
1, 1,
false false,
ShuffleSpec.UNLIMITED
), ),
1, 1,
false false

View File

@ -139,7 +139,8 @@ public class MockQueryDefinitionBuilder
0 0
), ),
MAX_NUM_PARTITIONS, MAX_NUM_PARTITIONS,
false false,
ShuffleSpec.UNLIMITED
); );
break; break;

View File

@ -177,15 +177,21 @@ public class FrameChannelMerger implements FrameProcessor<Long>
return ReturnOrAwait.awaitAll(awaitSet); return ReturnOrAwait.awaitAll(awaitSet);
} }
// Check finished() after populateCurrentFramesAndTournamentTree().
if (finished()) { if (finished()) {
// Done!
return ReturnOrAwait.returnObject(rowsOutput); return ReturnOrAwait.returnObject(rowsOutput);
} }
// Generate one output frame and stop for now. // Generate one output frame and stop for now.
outputChannel.write(nextFrame()); outputChannel.write(nextFrame());
// Check finished() after nextFrame().
if (finished()) {
return ReturnOrAwait.returnObject(rowsOutput);
} else {
return ReturnOrAwait.runAgain(); return ReturnOrAwait.runAgain();
} }
}
private FrameWithPartition nextFrame() private FrameWithPartition nextFrame()
{ {

View File

@ -123,6 +123,7 @@ public class SuperSorter
{ {
private static final Logger log = new Logger(SuperSorter.class); private static final Logger log = new Logger(SuperSorter.class);
public static final long UNLIMITED = -1;
public static final int UNKNOWN_LEVEL = -1; public static final int UNKNOWN_LEVEL = -1;
public static final long UNKNOWN_TOTAL = -1; public static final long UNKNOWN_TOTAL = -1;
@ -136,10 +137,8 @@ public class SuperSorter
private final OutputChannelFactory intermediateOutputChannelFactory; private final OutputChannelFactory intermediateOutputChannelFactory;
private final int maxChannelsPerMerger; private final int maxChannelsPerMerger;
private final int maxActiveProcessors; private final int maxActiveProcessors;
private final long rowLimit;
private final String cancellationId; private final String cancellationId;
private final boolean removeNullBytes; private final boolean removeNullBytes;
private final Object runWorkersLock = new Object(); private final Object runWorkersLock = new Object();
@GuardedBy("runWorkersLock") @GuardedBy("runWorkersLock")
@ -184,6 +183,9 @@ public class SuperSorter
@GuardedBy("runWorkersLock") @GuardedBy("runWorkersLock")
SuperSorterProgressTracker superSorterProgressTracker; SuperSorterProgressTracker superSorterProgressTracker;
@GuardedBy("runWorkersLock")
private long rowLimit;
/** /**
* See {@link #setNoWorkRunnable}. * See {@link #setNoWorkRunnable}.
*/ */
@ -212,9 +214,9 @@ public class SuperSorter
* @param maxChannelsPerMerger maximum number of channels to merge at once, for regular mergers * @param maxChannelsPerMerger maximum number of channels to merge at once, for regular mergers
* (does not apply to direct mergers; see * (does not apply to direct mergers; see
* {@link #getMaxInputBufferFramesForDirectMerging()}) * {@link #getMaxInputBufferFramesForDirectMerging()})
* @param rowLimit limit to apply during sorting. The limit is merely advisory: the actual number * @param rowLimit limit to apply during sorting. The limit is applied across all partitions,
* of rows returned may be larger than the limit. The limit is applied across * not to each partition individually. Use {@link #UNLIMITED} if there is
* all partitions, not to each partition individually. * no limit.
* @param cancellationId cancellation id to use when running processors in the provided * @param cancellationId cancellation id to use when running processors in the provided
* {@link FrameProcessorExecutor}. * {@link FrameProcessorExecutor}.
* @param superSorterProgressTracker progress tracker * @param superSorterProgressTracker progress tracker
@ -262,6 +264,10 @@ public class SuperSorter
if (maxChannelsPerMerger < 2) { if (maxChannelsPerMerger < 2) {
throw new IAE("maxChannelsPerMerger[%d] < 2", maxChannelsPerMerger); throw new IAE("maxChannelsPerMerger[%d] < 2", maxChannelsPerMerger);
} }
if (rowLimit != UNLIMITED && rowLimit <= 0) {
throw new IAE("rowLimit[%d] must be positive", rowLimit);
}
} }
/** /**
@ -385,6 +391,7 @@ public class SuperSorter
@GuardedBy("runWorkersLock") @GuardedBy("runWorkersLock")
private void setAllDoneIfPossible() private void setAllDoneIfPossible()
{ {
try {
if (totalInputFrames == 0 && outputPartitionsFuture.isDone()) { if (totalInputFrames == 0 && outputPartitionsFuture.isDone()) {
// No input data -- generate empty output channels. // No input data -- generate empty output channels.
final ClusterByPartitions partitions = getOutputPartitions(); final ClusterByPartitions partitions = getOutputPartitions();
@ -396,20 +403,31 @@ public class SuperSorter
// OK to use wrap, not wrapReadOnly, because nil channels are already read-only. // OK to use wrap, not wrapReadOnly, because nil channels are already read-only.
allDone.set(OutputChannels.wrap(channels)); allDone.set(OutputChannels.wrap(channels));
} else if (rowLimit == 0 && activeProcessors == 0) {
// We had a row limit, and got it all the way down to zero.
// Generate empty output channels for any partitions that we haven't written yet.
for (int partitionNum = 0; partitionNum < outputChannels.size(); partitionNum++) {
if (outputChannels.get(partitionNum) == null) {
outputChannels.set(partitionNum, outputChannelFactory.openNilChannel(partitionNum));
superSorterProgressTracker.addMergedBatchesForLevel(totalMergingLevels - 1, 1);
}
}
// OK to use wrap, not wrapReadOnly, because all channels in this list are already read-only.
allDone.set(OutputChannels.wrap(outputChannels));
} else if (totalMergingLevels != UNKNOWN_LEVEL } else if (totalMergingLevels != UNKNOWN_LEVEL
&& outputsReadyByLevel.containsKey(totalMergingLevels - 1) && outputsReadyByLevel.containsKey(totalMergingLevels - 1)
&& (outputsReadyByLevel.get(totalMergingLevels - 1).size() == && (outputsReadyByLevel.get(totalMergingLevels - 1).size() ==
getTotalMergersInLevel(totalMergingLevels - 1))) { getTotalMergersInLevel(totalMergingLevels - 1))) {
// We're done!! // We're done!!
try {
// OK to use wrap, not wrapReadOnly, because all channels in this list are already read-only. // OK to use wrap, not wrapReadOnly, because all channels in this list are already read-only.
allDone.set(OutputChannels.wrap(outputChannels)); allDone.set(OutputChannels.wrap(outputChannels));
} }
}
catch (Throwable e) { catch (Throwable e) {
allDone.setException(e); allDone.setException(e);
} }
} }
}
@GuardedBy("runWorkersLock") @GuardedBy("runWorkersLock")
private boolean runNextBatcher() private boolean runNextBatcher()
@ -463,6 +481,11 @@ public class SuperSorter
return false; return false;
} }
if (isLimited() && (rowLimit == 0 || activeProcessors > 0)) {
// Run final-layer mergers one at a time, to ensure limit is applied across the entire dataset.
return false;
}
final List<ReadableFrameChannel> in = new ArrayList<>(); final List<ReadableFrameChannel> in = new ArrayList<>();
for (final Frame frame : inputBuffer) { for (final Frame frame : inputBuffer) {
@ -617,6 +640,11 @@ public class SuperSorter
return false; return false;
} }
if (isLimited() && (rowLimit == 0 || activeProcessors > 0)) {
// Run final-layer mergers one at a time, to ensure limit is applied across the entire dataset.
return false;
}
final int inLevel = totalMergingLevels - 2; final int inLevel = totalMergingLevels - 2;
final int outLevel = inLevel + 1; final int outLevel = inLevel + 1;
final LongSortedSet inputsReady = outputsReadyByLevel.get(inLevel); final LongSortedSet inputsReady = outputsReadyByLevel.get(inLevel);
@ -719,11 +747,20 @@ public class SuperSorter
rowLimit rowLimit
); );
runWorker(worker, ignored1 -> { runWorker(worker, outputRows -> {
synchronized (runWorkersLock) { synchronized (runWorkersLock) {
outputsReadyByLevel.computeIfAbsent(level, ignored2 -> new LongRBTreeSet()) outputsReadyByLevel.computeIfAbsent(level, ignored2 -> new LongRBTreeSet())
.add(rank); .add(rank);
superSorterProgressTracker.addMergedBatchesForLevel(level, 1); superSorterProgressTracker.addMergedBatchesForLevel(level, 1);
if (isLimited() && totalMergingLevels != UNKNOWN_LEVEL && level == totalMergingLevels - 1) {
rowLimit -= outputRows;
if (rowLimit < 0) {
throw DruidException.defensive("rowLimit[%d] below zero after outputRows[%d]", rowLimit, outputRows);
}
}
for (PartitionedReadableFrameChannel partitionedReadableFrameChannel : partitionedReadableChannelsToClose) { for (PartitionedReadableFrameChannel partitionedReadableFrameChannel : partitionedReadableChannelsToClose) {
try { try {
partitionedReadableFrameChannel.close(); partitionedReadableFrameChannel.close();
@ -984,6 +1021,12 @@ public class SuperSorter
return maxChannelsPerMerger * maxActiveProcessors; return maxChannelsPerMerger * maxActiveProcessors;
} }
@GuardedBy("runWorkersLock")
private boolean isLimited()
{
return rowLimit != UNLIMITED;
}
/** /**
* Returns a string encapsulating the current state of this object. * Returns a string encapsulating the current state of this object.
*/ */

View File

@ -21,6 +21,7 @@ package org.apache.druid.frame.processor;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
@ -58,6 +59,8 @@ import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.testing.InitializedNullHandlingTest;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -131,7 +134,7 @@ public class SuperSorterTest
new FileOutputChannelFactory(tempFolder, FRAME_SIZE, null), new FileOutputChannelFactory(tempFolder, FRAME_SIZE, null),
2, 2,
2, 2,
-1, SuperSorter.UNLIMITED,
null, null,
superSorterProgressTracker, superSorterProgressTracker,
false false
@ -181,6 +184,40 @@ public class SuperSorterTest
Assert.assertEquals(1.0, superSorterProgressTracker.snapshot().getProgressDigest(), 0.0f); Assert.assertEquals(1.0, superSorterProgressTracker.snapshot().getProgressDigest(), 0.0f);
channel.close(); channel.close();
} }
@Test
public void testLimitHint() throws Exception
{
final BlockingQueueFrameChannel inputChannel = BlockingQueueFrameChannel.minimal();
inputChannel.writable().close();
final SuperSorterProgressTracker superSorterProgressTracker = new SuperSorterProgressTracker();
final File tempFolder = temporaryFolder.newFolder();
final SuperSorter superSorter = new SuperSorter(
Collections.singletonList(inputChannel.readable()),
FrameReader.create(RowSignature.empty()),
Collections.emptyList(),
Futures.immediateFuture(ClusterByPartitions.oneUniversalPartition()),
exec,
new FileOutputChannelFactory(tempFolder, FRAME_SIZE, null),
new FileOutputChannelFactory(tempFolder, FRAME_SIZE, null),
2,
2,
3,
null,
superSorterProgressTracker,
false
);
final OutputChannels channels = superSorter.run().get();
Assert.assertEquals(1, channels.getAllChannels().size());
final ReadableFrameChannel channel = Iterables.getOnlyElement(channels.getAllChannels()).getReadableChannel();
Assert.assertTrue(channel.isFinished());
Assert.assertEquals(1.0, superSorterProgressTracker.snapshot().getProgressDigest(), 0.0f);
channel.close();
}
} }
/** /**
@ -201,6 +238,7 @@ public class SuperSorterTest
private final int numThreads; private final int numThreads;
private final boolean isComposedStorage; private final boolean isComposedStorage;
private final boolean partitionsDeferred; private final boolean partitionsDeferred;
private final long limitHint;
private StorageAdapter adapter; private StorageAdapter adapter;
private RowSignature signature; private RowSignature signature;
@ -216,7 +254,8 @@ public class SuperSorterTest
int maxChannelsPerProcessor, int maxChannelsPerProcessor,
int numThreads, int numThreads,
boolean isComposedStorage, boolean isComposedStorage,
boolean partitionsDeferred boolean partitionsDeferred,
long limitHint
) )
{ {
this.maxRowsPerFrame = maxRowsPerFrame; this.maxRowsPerFrame = maxRowsPerFrame;
@ -227,6 +266,7 @@ public class SuperSorterTest
this.numThreads = numThreads; this.numThreads = numThreads;
this.isComposedStorage = isComposedStorage; this.isComposedStorage = isComposedStorage;
this.partitionsDeferred = partitionsDeferred; this.partitionsDeferred = partitionsDeferred;
this.limitHint = limitHint;
} }
@Parameterized.Parameters( @Parameterized.Parameters(
@ -237,7 +277,8 @@ public class SuperSorterTest
+ "maxChannelsPerProcessor= {4}, " + "maxChannelsPerProcessor= {4}, "
+ "numThreads = {5}, " + "numThreads = {5}, "
+ "isComposedStorage = {6}, " + "isComposedStorage = {6}, "
+ "partitionsDeferred = {7}" + "partitionsDeferred = {7}, "
+ "limitHint = {8}"
) )
public static Iterable<Object[]> constructorFeeder() public static Iterable<Object[]> constructorFeeder()
{ {
@ -251,6 +292,7 @@ public class SuperSorterTest
for (int numThreads : new int[]{1, 3}) { for (int numThreads : new int[]{1, 3}) {
for (boolean isComposedStorage : new boolean[]{true, false}) { for (boolean isComposedStorage : new boolean[]{true, false}) {
for (boolean partitionsDeferred : new boolean[]{true, false}) { for (boolean partitionsDeferred : new boolean[]{true, false}) {
for (long limitHint : new long[]{SuperSorter.UNLIMITED, 3, 1_000}) {
constructors.add( constructors.add(
new Object[]{ new Object[]{
maxRowsPerFrame, maxRowsPerFrame,
@ -260,7 +302,8 @@ public class SuperSorterTest
maxChannelsPerProcessor, maxChannelsPerProcessor,
numThreads, numThreads,
isComposedStorage, isComposedStorage,
partitionsDeferred partitionsDeferred,
limitHint
} }
); );
} }
@ -271,6 +314,7 @@ public class SuperSorterTest
} }
} }
} }
}
return constructors; return constructors;
} }
@ -352,7 +396,7 @@ public class SuperSorterTest
outputChannelFactory, outputChannelFactory,
maxActiveProcessors, maxActiveProcessors,
maxChannelsPerProcessor, maxChannelsPerProcessor,
-1, limitHint,
null, null,
superSorterProgressTracker, superSorterProgressTracker,
false false
@ -415,6 +459,10 @@ public class SuperSorterTest
); );
} }
if (limitHint != SuperSorter.UNLIMITED) {
MatcherAssert.assertThat(readRows.size(), Matchers.greaterThanOrEqualTo(Ints.checkedCast(limitHint)));
}
final Sequence<List<Object>> expectedRows = Sequences.sort( final Sequence<List<Object>> expectedRows = Sequences.sort(
FrameTestUtil.readRowsFromAdapter(adapter, signature, true), FrameTestUtil.readRowsFromAdapter(adapter, signature, true),
Comparator.comparing( Comparator.comparing(
@ -429,7 +477,7 @@ public class SuperSorterTest
}, },
keyComparator keyComparator
) )
); ).limit(limitHint == SuperSorter.UNLIMITED ? Long.MAX_VALUE : readRows.size());
FrameTestUtil.assertRowsEqual(expectedRows, Sequences.simple(readRows)); FrameTestUtil.assertRowsEqual(expectedRows, Sequences.simple(readRows));
} }