From 8232c0366750238b8c35321abd6f09bfbff10df0 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Sat, 29 Jul 2023 01:45:17 +0000 Subject: [PATCH] [MSQ] Handle dimensionless group by queries with partitioning, and multiple workers (#14678) * fixup * add ut * review --- .../druid/msq/kernel/StageDefinition.java | 2 +- .../msq/querykit/groupby/GroupByQueryKit.java | 6 ++-- .../apache/druid/msq/exec/MSQInsertTest.java | 34 +++++++++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java index 39843db2ba7..c81f58691f1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java @@ -130,7 +130,7 @@ public class StageDefinition this.shuffleCheckHasMultipleValues = shuffleCheckHasMultipleValues; this.frameReader = Suppliers.memoize(() -> FrameReader.create(signature))::get; - if (mustGatherResultKeyStatistics() && shuffleSpec.clusterBy().getColumns().isEmpty()) { + if (mustGatherResultKeyStatistics() && shuffleSpec.clusterBy().isEmpty()) { throw new IAE("Cannot shuffle with spec [%s] and nil clusterBy", shuffleSpec); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index 3b4ea36eb08..4ea56f73881 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -120,12 +120,14 @@ public class GroupByQueryKit implements QueryKit // (i.e. no GROUP BY clause) // __time in such queries is generated using either an aggregator (e.g. sum(metric) as __time) or using a // post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time) - if (intermediateClusterBy.getColumns().isEmpty() && resultClusterBy.isEmpty()) { + if (intermediateClusterBy.isEmpty() && resultClusterBy.isEmpty()) { // Ignore shuffleSpecFactory, since we know only a single partition will come out, and we can save some effort. shuffleSpecFactoryPreAggregation = ShuffleSpecFactories.singlePartition(); shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartition(); } else if (doOrderBy) { - shuffleSpecFactoryPreAggregation = ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount); + shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty() + ? ShuffleSpecFactories.singlePartition() + : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount); shuffleSpecFactoryPostAggregation = doLimitOrOffset ? ShuffleSpecFactories.singlePartition() : resultShuffleSpecFactory; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index 0f4d28a5556..009f595bf31 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -437,6 +437,40 @@ public class MSQInsertTest extends MSQTestBase } + @Test + public void testInsertOnFoo1WithTimeAggregatorAndMultipleWorkers() + { + Map localContext = new HashMap<>(context); + localContext.put(MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY, WorkerAssignmentStrategy.MAX.name()); + localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 4); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .build(); + + testIngestQuery().setSql( + "INSERT INTO foo1 " + + "SELECT MILLIS_TO_TIMESTAMP((SUM(CAST(\"m1\" AS BIGINT)))) AS __time " + + "FROM foo " + + "PARTITIONED BY DAY" + ) + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .setQueryContext(localContext) + .setExpectedSegment( + ImmutableSet.of( + SegmentId.of("foo1", Intervals.of("1970-01-01/P1D"), "test", 0) + ) + ) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{21L} + ) + ) + .verifyResults(); + } + + @Test public void testInsertOnFoo1WithTimePostAggregator() {