"Partition boost" the group by queries in MSQ for better splits (#15474)

"Partition boost" the group by queries in MSQ for better splits
This commit is contained in:
Laksh Singla 2024-01-11 12:46:27 +05:30 committed by GitHub
parent 5b769a7d32
commit 87fbe42218
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 112 additions and 23 deletions

View File

@ -60,8 +60,9 @@ import java.util.stream.Collectors;
public class QueryKitUtils public class QueryKitUtils
{ {
/** /**
* Field in frames that stores the partition "boosting" value. Typically used as the last element of a partitioning * Field in frames that stores the partition "boosting" value. Typically, it is used as the last element of a
* key when generating segments. This is an incrementing number that helps split up otherwise too-large partitions. * partitioning key when generating segments. This is an incrementing number that helps split up otherwise too-large
* partitions.
*/ */
public static final String PARTITION_BOOST_COLUMN = "__boost"; public static final String PARTITION_BOOST_COLUMN = "__boost";

View File

@ -30,6 +30,7 @@ import org.apache.druid.frame.processor.FrameProcessors;
import org.apache.druid.frame.processor.FrameRowTooLargeException; import org.apache.druid.frame.processor.FrameRowTooLargeException;
import org.apache.druid.frame.processor.ReturnOrAwait; import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.util.SettableLongVirtualColumn;
import org.apache.druid.frame.write.FrameWriter; import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory; import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.Unit;
@ -52,6 +53,7 @@ import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
@ -75,6 +77,8 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor<Object>
@Nullable @Nullable
private final HavingSpec havingSpec; private final HavingSpec havingSpec;
private final SettableLongVirtualColumn partitionBoostVirtualColumn;
private Cursor frameCursor = null; private Cursor frameCursor = null;
private Supplier<ResultRow> rowSupplierFromFrameCursor; private Supplier<ResultRow> rowSupplierFromFrameCursor;
private ResultRow outputRow = null; private ResultRow outputRow = null;
@ -99,8 +103,9 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor<Object>
this.mergeFn = groupingEngine.createMergeFn(query); this.mergeFn = groupingEngine.createMergeFn(query);
this.finalizeFn = makeFinalizeFn(query); this.finalizeFn = makeFinalizeFn(query);
this.havingSpec = cloneHavingSpec(query); this.havingSpec = cloneHavingSpec(query);
this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
this.columnSelectorFactoryForFrameWriter = this.columnSelectorFactoryForFrameWriter =
makeVirtualColumnsForFrameWriter(jsonMapper, query).wrap( makeVirtualColumnsForFrameWriter(partitionBoostVirtualColumn, jsonMapper, query).wrap(
RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory( RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
query, query,
() -> outputRow, () -> outputRow,
@ -233,6 +238,7 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor<Object>
finalizeFn.accept(outputRow); finalizeFn.accept(outputRow);
if (frameWriter.addSelection()) { if (frameWriter.addSelection()) {
incrementBoostColumn();
outputRow = null; outputRow = null;
return false; return false;
} else if (frameWriter.getNumRows() > 0) { } else if (frameWriter.getNumRows() > 0) {
@ -240,6 +246,7 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor<Object>
setUpFrameWriterIfNeeded(); setUpFrameWriterIfNeeded();
if (frameWriter.addSelection()) { if (frameWriter.addSelection()) {
incrementBoostColumn();
outputRow = null; outputRow = null;
return true; return true;
} else { } else {
@ -306,17 +313,29 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor<Object>
* this processor. Kept in sync with the signature generated by {@link GroupByQueryKit}. * this processor. Kept in sync with the signature generated by {@link GroupByQueryKit}.
*/ */
private static VirtualColumns makeVirtualColumnsForFrameWriter( private static VirtualColumns makeVirtualColumnsForFrameWriter(
@Nullable final VirtualColumn partitionBoostVirtualColumn,
final ObjectMapper jsonMapper, final ObjectMapper jsonMapper,
final GroupByQuery query final GroupByQuery query
) )
{ {
List<VirtualColumn> virtualColumns = new ArrayList<>();
virtualColumns.add(partitionBoostVirtualColumn);
final VirtualColumn segmentGranularityVirtualColumn = final VirtualColumn segmentGranularityVirtualColumn =
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
if (segmentGranularityVirtualColumn != null) {
if (segmentGranularityVirtualColumn == null) { virtualColumns.add(segmentGranularityVirtualColumn);
return VirtualColumns.EMPTY;
} else {
return VirtualColumns.create(Collections.singletonList(segmentGranularityVirtualColumn));
} }
return VirtualColumns.create(virtualColumns);
}
/**
* Increments the value of the partition boosting column. It should be called once the row value has been written
* to the frame
*/
private void incrementBoostColumn()
{
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 1);
} }
} }

View File

@ -99,15 +99,10 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext()); QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext());
final RowSignature intermediateSignature = computeIntermediateSignature(queryToRun); final RowSignature intermediateSignature = computeIntermediateSignature(queryToRun);
final ClusterBy resultClusterByWithoutGranularity = computeClusterByForResults(queryToRun); final ClusterBy resultClusterByWithoutGranularity = computeClusterByForResults(queryToRun);
final ClusterBy resultClusterBy = final ClusterBy resultClusterByWithoutPartitionBoost =
QueryKitUtils.clusterByWithSegmentGranularity(resultClusterByWithoutGranularity, segmentGranularity); QueryKitUtils.clusterByWithSegmentGranularity(resultClusterByWithoutGranularity, segmentGranularity);
final RowSignature resultSignature =
QueryKitUtils.sortableSignature(
QueryKitUtils.signatureWithSegmentGranularity(computeResultSignature(queryToRun), segmentGranularity),
resultClusterBy.getColumns()
);
final ClusterBy intermediateClusterBy = computeIntermediateClusterBy(queryToRun); final ClusterBy intermediateClusterBy = computeIntermediateClusterBy(queryToRun);
final boolean doOrderBy = !resultClusterBy.equals(intermediateClusterBy); final boolean doOrderBy = !resultClusterByWithoutPartitionBoost.equals(intermediateClusterBy);
final boolean doLimitOrOffset = final boolean doLimitOrOffset =
queryToRun.getLimitSpec() instanceof DefaultLimitSpec queryToRun.getLimitSpec() instanceof DefaultLimitSpec
&& (((DefaultLimitSpec) queryToRun.getLimitSpec()).isLimited() && (((DefaultLimitSpec) queryToRun.getLimitSpec()).isLimited()
@ -115,23 +110,30 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
final ShuffleSpecFactory shuffleSpecFactoryPreAggregation; final ShuffleSpecFactory shuffleSpecFactoryPreAggregation;
final ShuffleSpecFactory shuffleSpecFactoryPostAggregation; final ShuffleSpecFactory shuffleSpecFactoryPostAggregation;
boolean partitionBoost;
// There can be a situation where intermediateClusterBy is empty, while the result is non-empty if (intermediateClusterBy.isEmpty() && resultClusterByWithoutPartitionBoost.isEmpty()) {
// if we have PARTITIONED BY on anything except ALL, however we don't have a grouping dimension
// (i.e. no GROUP BY clause)
// __time in such queries is generated using either an aggregator (e.g. sum(metric) as __time) or using a
// post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time)
if (intermediateClusterBy.isEmpty() && resultClusterBy.isEmpty()) {
// Ignore shuffleSpecFactory, since we know only a single partition will come out, and we can save some effort. // Ignore shuffleSpecFactory, since we know only a single partition will come out, and we can save some effort.
// This condition will be triggered when we don't have a grouping dimension, no partitioning granularity
// (PARTITIONED BY ALL) and no ordering/clustering dimensions
// For example: INSERT INTO foo SELECT COUNT(*) FROM bar PARTITIONED BY ALL
shuffleSpecFactoryPreAggregation = ShuffleSpecFactories.singlePartition(); shuffleSpecFactoryPreAggregation = ShuffleSpecFactories.singlePartition();
shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartition(); shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartition();
partitionBoost = false;
} else if (doOrderBy) { } else if (doOrderBy) {
// There can be a situation where intermediateClusterBy is empty, while the resultClusterBy is non-empty
// if we have PARTITIONED BY on anything except ALL, however we don't have a grouping dimension
// (i.e. no GROUP BY clause)
// __time in such queries is generated using either an aggregator (e.g. sum(metric) as __time) or using a
// post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time)
// For example: INSERT INTO foo SELECT COUNT(*), TIMESTAMP '2000-01-01' AS __time FROM bar PARTITIONED BY DAY
shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty() shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty()
? ShuffleSpecFactories.singlePartition() ? ShuffleSpecFactories.singlePartition()
: ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount); : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount);
shuffleSpecFactoryPostAggregation = doLimitOrOffset shuffleSpecFactoryPostAggregation = doLimitOrOffset
? ShuffleSpecFactories.singlePartition() ? ShuffleSpecFactories.singlePartition()
: resultShuffleSpecFactory; : resultShuffleSpecFactory;
partitionBoost = true;
} else { } else {
shuffleSpecFactoryPreAggregation = doLimitOrOffset shuffleSpecFactoryPreAggregation = doLimitOrOffset
? ShuffleSpecFactories.singlePartition() ? ShuffleSpecFactories.singlePartition()
@ -139,6 +141,7 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
// null: retain partitions from input (i.e. from preAggregation). // null: retain partitions from input (i.e. from preAggregation).
shuffleSpecFactoryPostAggregation = null; shuffleSpecFactoryPostAggregation = null;
partitionBoost = false;
} }
queryDefBuilder.add( queryDefBuilder.add(
@ -151,6 +154,18 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
.processorFactory(new GroupByPreShuffleFrameProcessorFactory(queryToRun)) .processorFactory(new GroupByPreShuffleFrameProcessorFactory(queryToRun))
); );
ClusterBy resultClusterBy = computeResultClusterBy(
queryToRun,
segmentGranularity,
partitionBoost
);
RowSignature resultSignature = computeResultSignature(
queryToRun,
segmentGranularity,
resultClusterBy,
partitionBoost
);
queryDefBuilder.add( queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 1) StageDefinition.builder(firstStageNumber + 1)
.inputs(new StageInputSpec(firstStageNumber)) .inputs(new StageInputSpec(firstStageNumber))
@ -188,7 +203,7 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
* Intermediate signature of a particular {@link GroupByQuery}. Does not include post-aggregators, and all * Intermediate signature of a particular {@link GroupByQuery}. Does not include post-aggregators, and all
* aggregations are nonfinalized. * aggregations are nonfinalized.
*/ */
static RowSignature computeIntermediateSignature(final GroupByQuery query) private static RowSignature computeIntermediateSignature(final GroupByQuery query)
{ {
final RowSignature postAggregationSignature = query.getResultRowSignature(RowSignature.Finalization.NO); final RowSignature postAggregationSignature = query.getResultRowSignature(RowSignature.Finalization.NO);
final RowSignature.Builder builder = RowSignature.builder(); final RowSignature.Builder builder = RowSignature.builder();
@ -207,13 +222,67 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
* Result signature of a particular {@link GroupByQuery}. Includes post-aggregators, and aggregations are * Result signature of a particular {@link GroupByQuery}. Includes post-aggregators, and aggregations are
* finalized by default. (But may be nonfinalized, depending on {@link #isFinalize}. * finalized by default. (But may be nonfinalized, depending on {@link #isFinalize}.
*/ */
static RowSignature computeResultSignature(final GroupByQuery query) private static RowSignature computeResultSignature(final GroupByQuery query)
{ {
final RowSignature.Finalization finalization = final RowSignature.Finalization finalization =
isFinalize(query) ? RowSignature.Finalization.YES : RowSignature.Finalization.NO; isFinalize(query) ? RowSignature.Finalization.YES : RowSignature.Finalization.NO;
return query.getResultRowSignature(finalization); return query.getResultRowSignature(finalization);
} }
/**
* Computes the result clusterBy which may or may not have the partition boosted column, depending on the
* {@code partitionBoost} parameter passed
*/
private static ClusterBy computeResultClusterBy(
final GroupByQuery query,
final Granularity segmentGranularity,
final boolean partitionBoost
)
{
final ClusterBy resultClusterByWithoutGranularity = computeClusterByForResults(query);
final ClusterBy resultClusterByWithoutPartitionBoost =
QueryKitUtils.clusterByWithSegmentGranularity(resultClusterByWithoutGranularity, segmentGranularity);
if (!partitionBoost) {
return resultClusterByWithoutPartitionBoost;
}
List<KeyColumn> resultClusterByWithPartitionBoostColumns = new ArrayList<>(resultClusterByWithoutPartitionBoost.getColumns());
resultClusterByWithPartitionBoostColumns.add(new KeyColumn(
QueryKitUtils.PARTITION_BOOST_COLUMN,
KeyOrder.ASCENDING
));
return new ClusterBy(
resultClusterByWithPartitionBoostColumns,
resultClusterByWithoutPartitionBoost.getBucketByCount()
);
}
/**
* Computes the result signature which may or may not have the partition boosted column depending on the
* {@code partitionBoost} passed. It expects that the clusterBy already has the partition boost column
* if the parameter {@code partitionBoost} is set as true.
*/
private static RowSignature computeResultSignature(
final GroupByQuery query,
final Granularity segmentGranularity,
final ClusterBy resultClusterBy,
final boolean partitionBoost
)
{
final RowSignature resultSignatureWithoutPartitionBoost =
QueryKitUtils.signatureWithSegmentGranularity(computeResultSignature(query), segmentGranularity);
if (!partitionBoost) {
return QueryKitUtils.sortableSignature(resultSignatureWithoutPartitionBoost, resultClusterBy.getColumns());
}
final RowSignature resultSignatureWithPartitionBoost =
RowSignature.builder().addAll(resultSignatureWithoutPartitionBoost)
.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG)
.build();
return QueryKitUtils.sortableSignature(resultSignatureWithPartitionBoost, resultClusterBy.getColumns());
}
/** /**
* Whether aggregations appearing in the result of a query must be finalized. * Whether aggregations appearing in the result of a query must be finalized.
* *