From 87fbe422187e6e7fa02983fbc70af95bc371e3f1 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 11 Jan 2024 12:46:27 +0530 Subject: [PATCH] "Partition boost" the group by queries in MSQ for better splits (#15474) "Partition boost" the group by queries in MSQ for better splits --- .../druid/msq/querykit/QueryKitUtils.java | 5 +- .../GroupByPostShuffleFrameProcessor.java | 31 ++++-- .../msq/querykit/groupby/GroupByQueryKit.java | 99 ++++++++++++++++--- 3 files changed, 112 insertions(+), 23 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java index 4791c33de6b..e5f2a0152fd 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java @@ -60,8 +60,9 @@ import java.util.stream.Collectors; public class QueryKitUtils { /** - * Field in frames that stores the partition "boosting" value. Typically used as the last element of a partitioning - * key when generating segments. This is an incrementing number that helps split up otherwise too-large partitions. + * Field in frames that stores the partition "boosting" value. Typically, it is used as the last element of a + * partitioning key when generating segments. This is an incrementing number that helps split up otherwise too-large + * partitions. */ public static final String PARTITION_BOOST_COLUMN = "__boost"; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java index ed14dd52739..fb39118a59f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java @@ -30,6 +30,7 @@ import org.apache.druid.frame.processor.FrameProcessors; import org.apache.druid.frame.processor.FrameRowTooLargeException; import org.apache.druid.frame.processor.ReturnOrAwait; import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.util.SettableLongVirtualColumn; import org.apache.druid.frame.write.FrameWriter; import org.apache.druid.frame.write.FrameWriterFactory; import org.apache.druid.java.util.common.Unit; @@ -52,6 +53,7 @@ import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -75,6 +77,8 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor @Nullable private final HavingSpec havingSpec; + private final SettableLongVirtualColumn partitionBoostVirtualColumn; + private Cursor frameCursor = null; private Supplier rowSupplierFromFrameCursor; private ResultRow outputRow = null; @@ -99,8 +103,9 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor this.mergeFn = groupingEngine.createMergeFn(query); this.finalizeFn = makeFinalizeFn(query); this.havingSpec = cloneHavingSpec(query); + this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN); this.columnSelectorFactoryForFrameWriter = - makeVirtualColumnsForFrameWriter(jsonMapper, query).wrap( + makeVirtualColumnsForFrameWriter(partitionBoostVirtualColumn, jsonMapper, query).wrap( RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory( query, () -> outputRow, @@ -233,6 +238,7 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor finalizeFn.accept(outputRow); if (frameWriter.addSelection()) { + incrementBoostColumn(); outputRow = null; return false; } else if (frameWriter.getNumRows() > 0) { @@ -240,6 +246,7 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor setUpFrameWriterIfNeeded(); if (frameWriter.addSelection()) { + incrementBoostColumn(); outputRow = null; return true; } else { @@ -306,17 +313,29 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor * this processor. Kept in sync with the signature generated by {@link GroupByQueryKit}. */ private static VirtualColumns makeVirtualColumnsForFrameWriter( + @Nullable final VirtualColumn partitionBoostVirtualColumn, final ObjectMapper jsonMapper, final GroupByQuery query ) { + List virtualColumns = new ArrayList<>(); + + virtualColumns.add(partitionBoostVirtualColumn); final VirtualColumn segmentGranularityVirtualColumn = QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); - - if (segmentGranularityVirtualColumn == null) { - return VirtualColumns.EMPTY; - } else { - return VirtualColumns.create(Collections.singletonList(segmentGranularityVirtualColumn)); + if (segmentGranularityVirtualColumn != null) { + virtualColumns.add(segmentGranularityVirtualColumn); } + + return VirtualColumns.create(virtualColumns); + } + + /** + * Increments the value of the partition boosting column. It should be called once the row value has been written + * to the frame + */ + private void incrementBoostColumn() + { + partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 1); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index 469a8a8aa46..dca388b0338 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -99,15 +99,10 @@ public class GroupByQueryKit implements QueryKit QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext()); final RowSignature intermediateSignature = computeIntermediateSignature(queryToRun); final ClusterBy resultClusterByWithoutGranularity = computeClusterByForResults(queryToRun); - final ClusterBy resultClusterBy = + final ClusterBy resultClusterByWithoutPartitionBoost = QueryKitUtils.clusterByWithSegmentGranularity(resultClusterByWithoutGranularity, segmentGranularity); - final RowSignature resultSignature = - QueryKitUtils.sortableSignature( - QueryKitUtils.signatureWithSegmentGranularity(computeResultSignature(queryToRun), segmentGranularity), - resultClusterBy.getColumns() - ); final ClusterBy intermediateClusterBy = computeIntermediateClusterBy(queryToRun); - final boolean doOrderBy = !resultClusterBy.equals(intermediateClusterBy); + final boolean doOrderBy = !resultClusterByWithoutPartitionBoost.equals(intermediateClusterBy); final boolean doLimitOrOffset = queryToRun.getLimitSpec() instanceof DefaultLimitSpec && (((DefaultLimitSpec) queryToRun.getLimitSpec()).isLimited() @@ -115,23 +110,30 @@ public class GroupByQueryKit implements QueryKit final ShuffleSpecFactory shuffleSpecFactoryPreAggregation; final ShuffleSpecFactory shuffleSpecFactoryPostAggregation; + boolean partitionBoost; - // There can be a situation where intermediateClusterBy is empty, while the result is non-empty - // if we have PARTITIONED BY on anything except ALL, however we don't have a grouping dimension - // (i.e. no GROUP BY clause) - // __time in such queries is generated using either an aggregator (e.g. sum(metric) as __time) or using a - // post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time) - if (intermediateClusterBy.isEmpty() && resultClusterBy.isEmpty()) { + if (intermediateClusterBy.isEmpty() && resultClusterByWithoutPartitionBoost.isEmpty()) { // Ignore shuffleSpecFactory, since we know only a single partition will come out, and we can save some effort. + // This condition will be triggered when we don't have a grouping dimension, no partitioning granularity + // (PARTITIONED BY ALL) and no ordering/clustering dimensions + // For example: INSERT INTO foo SELECT COUNT(*) FROM bar PARTITIONED BY ALL shuffleSpecFactoryPreAggregation = ShuffleSpecFactories.singlePartition(); shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartition(); + partitionBoost = false; } else if (doOrderBy) { + // There can be a situation where intermediateClusterBy is empty, while the resultClusterBy is non-empty + // if we have PARTITIONED BY on anything except ALL, however we don't have a grouping dimension + // (i.e. no GROUP BY clause) + // __time in such queries is generated using either an aggregator (e.g. sum(metric) as __time) or using a + // post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time) + // For example: INSERT INTO foo SELECT COUNT(*), TIMESTAMP '2000-01-01' AS __time FROM bar PARTITIONED BY DAY shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty() ? ShuffleSpecFactories.singlePartition() : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount); shuffleSpecFactoryPostAggregation = doLimitOrOffset ? ShuffleSpecFactories.singlePartition() : resultShuffleSpecFactory; + partitionBoost = true; } else { shuffleSpecFactoryPreAggregation = doLimitOrOffset ? ShuffleSpecFactories.singlePartition() @@ -139,6 +141,7 @@ public class GroupByQueryKit implements QueryKit // null: retain partitions from input (i.e. from preAggregation). shuffleSpecFactoryPostAggregation = null; + partitionBoost = false; } queryDefBuilder.add( @@ -151,6 +154,18 @@ public class GroupByQueryKit implements QueryKit .processorFactory(new GroupByPreShuffleFrameProcessorFactory(queryToRun)) ); + ClusterBy resultClusterBy = computeResultClusterBy( + queryToRun, + segmentGranularity, + partitionBoost + ); + RowSignature resultSignature = computeResultSignature( + queryToRun, + segmentGranularity, + resultClusterBy, + partitionBoost + ); + queryDefBuilder.add( StageDefinition.builder(firstStageNumber + 1) .inputs(new StageInputSpec(firstStageNumber)) @@ -188,7 +203,7 @@ public class GroupByQueryKit implements QueryKit * Intermediate signature of a particular {@link GroupByQuery}. Does not include post-aggregators, and all * aggregations are nonfinalized. */ - static RowSignature computeIntermediateSignature(final GroupByQuery query) + private static RowSignature computeIntermediateSignature(final GroupByQuery query) { final RowSignature postAggregationSignature = query.getResultRowSignature(RowSignature.Finalization.NO); final RowSignature.Builder builder = RowSignature.builder(); @@ -207,13 +222,67 @@ public class GroupByQueryKit implements QueryKit * Result signature of a particular {@link GroupByQuery}. Includes post-aggregators, and aggregations are * finalized by default. (But may be nonfinalized, depending on {@link #isFinalize}. */ - static RowSignature computeResultSignature(final GroupByQuery query) + private static RowSignature computeResultSignature(final GroupByQuery query) { final RowSignature.Finalization finalization = isFinalize(query) ? RowSignature.Finalization.YES : RowSignature.Finalization.NO; return query.getResultRowSignature(finalization); } + /** + * Computes the result clusterBy which may or may not have the partition boosted column, depending on the + * {@code partitionBoost} parameter passed + */ + private static ClusterBy computeResultClusterBy( + final GroupByQuery query, + final Granularity segmentGranularity, + final boolean partitionBoost + ) + { + final ClusterBy resultClusterByWithoutGranularity = computeClusterByForResults(query); + final ClusterBy resultClusterByWithoutPartitionBoost = + QueryKitUtils.clusterByWithSegmentGranularity(resultClusterByWithoutGranularity, segmentGranularity); + if (!partitionBoost) { + return resultClusterByWithoutPartitionBoost; + } + List resultClusterByWithPartitionBoostColumns = new ArrayList<>(resultClusterByWithoutPartitionBoost.getColumns()); + resultClusterByWithPartitionBoostColumns.add(new KeyColumn( + QueryKitUtils.PARTITION_BOOST_COLUMN, + KeyOrder.ASCENDING + )); + return new ClusterBy( + resultClusterByWithPartitionBoostColumns, + resultClusterByWithoutPartitionBoost.getBucketByCount() + ); + } + + /** + * Computes the result signature which may or may not have the partition boosted column depending on the + * {@code partitionBoost} passed. It expects that the clusterBy already has the partition boost column + * if the parameter {@code partitionBoost} is set as true. + */ + private static RowSignature computeResultSignature( + final GroupByQuery query, + final Granularity segmentGranularity, + final ClusterBy resultClusterBy, + final boolean partitionBoost + ) + { + final RowSignature resultSignatureWithoutPartitionBoost = + QueryKitUtils.signatureWithSegmentGranularity(computeResultSignature(query), segmentGranularity); + + if (!partitionBoost) { + return QueryKitUtils.sortableSignature(resultSignatureWithoutPartitionBoost, resultClusterBy.getColumns()); + } + + final RowSignature resultSignatureWithPartitionBoost = + RowSignature.builder().addAll(resultSignatureWithoutPartitionBoost) + .add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG) + .build(); + + return QueryKitUtils.sortableSignature(resultSignatureWithPartitionBoost, resultClusterBy.getColumns()); + } + /** * Whether aggregations appearing in the result of a query must be finalized. *