From 3de12729269e8446b994881bd507bc0d1422e98e Mon Sep 17 00:00:00 2001 From: mchades <793098424@qq.com> Date: Sat, 12 Mar 2022 01:04:34 +0800 Subject: [PATCH] bug fix: merge results of group by limit push down (#11969) --- .../druid/query/groupby/GroupByQuery.java | 6 +- .../query/groupby/GroupByQueryRunnerTest.java | 70 +++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java index f02d3596dea..3f0b9a163ad 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java @@ -592,7 +592,11 @@ public class GroupByQuery extends BaseQuery needsReverseList.add(false); final ColumnType type = dimensions.get(i).getOutputType(); dimensionTypes.add(type); - comparators.add(StringComparators.LEXICOGRAPHIC); + if (type.isNumeric()) { + comparators.add(StringComparators.NUMERIC); + } else { + comparators.add(StringComparators.LEXICOGRAPHIC); + } } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index cf9490c7073..a48f2d936f1 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -11176,6 +11176,76 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest TestHelper.assertExpectedObjects(expectedResults, results, "order-limit"); } + @Test + public void testMergeLimitPushDownResultsWithLongDimensionNotInLimitSpec() + { + // Cannot vectorize due to extraction dimension spec. + cannotVectorize(); + + if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + return; + } + GroupByQuery.Builder builder = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setInterval("2011-04-02/2011-04-04") + .setDimensions(new ExtractionDimensionSpec("quality", "qualityLen", ColumnType.LONG, StrlenExtractionFn.instance())) + .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setLimitSpec( + new DefaultLimitSpec( + Collections.emptyList(), + 20 + ) + ) + .overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true)) + .setGranularity(Granularities.ALL); + + final GroupByQuery allGranQuery = builder.build(); + + QueryRunner mergedRunner = factory.getToolchest().mergeResults( + (queryPlus, responseContext) -> { + // simulate two daily segments + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) + ); + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) + ); + + return factory.getToolchest().mergeResults( + (queryPlus3, responseContext1) -> new MergeSequence<>( + queryPlus3.getQuery().getResultOrdering(), + Sequences.simple( + Arrays.asList( + runner.run(queryPlus1, responseContext1), + runner.run(queryPlus2, responseContext1) + ) + ) + ) + ).run(queryPlus, responseContext); + } + ); + Map context = new HashMap<>(); + List allGranExpectedResults = Arrays.asList( + makeRow(allGranQuery, "2011-04-02", "qualityLen", 4L, "rows", 2L), + makeRow(allGranQuery, "2011-04-02", "qualityLen", 6L, "rows", 4L), + makeRow(allGranQuery, "2011-04-02", "qualityLen", 7L, "rows", 6L), + makeRow(allGranQuery, "2011-04-02", "qualityLen", 8L, "rows", 2L), + makeRow(allGranQuery, "2011-04-02", "qualityLen", 9L, "rows", 6L), + makeRow(allGranQuery, "2011-04-02", "qualityLen", 10L, "rows", 4L), + makeRow(allGranQuery, "2011-04-02", "qualityLen", 13L, "rows", 2L) + ); + + TestHelper.assertExpectedObjects( + allGranExpectedResults, + mergedRunner.run(QueryPlus.wrap(allGranQuery)), + "merged" + ); + } + @Test public void testMergeResultsWithLimitPushDown() {