diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index f170550a102..6f1cee955d1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -975,7 +975,8 @@ public class GroupByQueryEngineV2 query.getDimensions(), getDimensionComparators(limitSpec), query.getResultRowHasTimestamp(), - query.getContextSortByDimsFirst() + query.getContextSortByDimsFirst(), + keySize ); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GrouperBufferComparatorUtils.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GrouperBufferComparatorUtils.java index f8d8488d782..467a7296247 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GrouperBufferComparatorUtils.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GrouperBufferComparatorUtils.java @@ -126,7 +126,8 @@ public class GrouperBufferComparatorUtils List dimensions, Grouper.BufferComparator[] dimComparators, boolean includeTimestamp, - boolean sortByDimsFirst + boolean sortByDimsFirst, + int keyBufferTotalSize ) { int dimCount = dimensions.size(); @@ -148,7 +149,8 @@ public class GrouperBufferComparatorUtils if (aggIndex >= 0) { final StringComparator stringComparator = orderSpec.getDimensionComparator(); final ValueType valueType = aggregatorFactories[aggIndex].getType(); - final int aggOffset = aggregatorOffsets[aggIndex] - Integer.BYTES; + // Aggregators start after dimensions + final int aggOffset = keyBufferTotalSize + aggregatorOffsets[aggIndex]; aggCount++; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index f812f14ccd4..8a3e2a46b6c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1252,7 +1252,10 @@ public class RowBasedGrouperHelper dimensions, serdeHelperComparators, includeTimestamp, - sortByDimsFirst + sortByDimsFirst, + Arrays.stream(serdeHelpers) + .mapToInt(RowBasedKeySerdeHelper::getKeyBufferValueSize) + .sum() ); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java index 3c62e11ea1a..7cf7e65d2bb 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java @@ -30,6 +30,7 @@ import org.apache.druid.collections.CloseableDefaultBlockingPool; import org.apache.druid.collections.CloseableStupidPool; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; @@ -140,6 +141,14 @@ public class GroupByLimitPushDownMultiNodeMergeTest } private IncrementalIndex makeIncIndex(boolean withRollup) + { + return makeIncIndex(withRollup, Arrays.asList( + new StringDimensionSchema("dimA"), + new LongDimensionSchema("metA") + )); + } + + private IncrementalIndex makeIncIndex(boolean withRollup, List dimensions) { return new OnheapIncrementalIndex.Builder() .setIndexSchema( @@ -311,7 +320,217 @@ public class GroupByLimitPushDownMultiNodeMergeTest ); QueryableIndex qindexD = INDEX_IO.loadIndex(fileD); - groupByIndices = Arrays.asList(qindexA, qindexB, qindexC, qindexD); + List dimNames2 = Arrays.asList("dimA", "dimB", "metA"); + List dimensions = Arrays.asList( + new StringDimensionSchema("dimA"), + new StringDimensionSchema("dimB"), + new LongDimensionSchema("metA") + ); + final IncrementalIndex indexE = makeIncIndex(false, dimensions); + incrementalIndices.add(indexE); + + event = new HashMap<>(); + event.put("dimA", "pomegranate"); + event.put("dimB", "raw"); + event.put("metA", 5L); + row = new MapBasedInputRow(1505260800000L, dimNames2, event); + indexE.add(row); + + event = new HashMap<>(); + event.put("dimA", "mango"); + event.put("dimB", "ripe"); + event.put("metA", 9L); + row = new MapBasedInputRow(1605260800000L, dimNames2, event); + indexE.add(row); + + event = new HashMap<>(); + event.put("dimA", "pomegranate"); + event.put("dimB", "raw"); + event.put("metA", 3L); + row = new MapBasedInputRow(1705264400000L, dimNames2, event); + indexE.add(row); + + event = new HashMap<>(); + event.put("dimA", "mango"); + event.put("dimB", "ripe"); + event.put("metA", 7L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexE.add(row); + + event = new HashMap<>(); + event.put("dimA", "grape"); + event.put("dimB", "raw"); + event.put("metA", 5L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexE.add(row); + + event = new HashMap<>(); + event.put("dimA", "apple"); + event.put("dimB", "ripe"); + event.put("metA", 3L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexE.add(row); + + event = new HashMap<>(); + event.put("dimA", "apple"); + event.put("dimB", "raw"); + event.put("metA", 1L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexE.add(row); + + event = new HashMap<>(); + event.put("dimA", "apple"); + event.put("dimB", "ripe"); + event.put("metA", 4L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexE.add(row); + + event = new HashMap<>(); + event.put("dimA", "apple"); + event.put("dimB", "raw"); + event.put("metA", 1L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexE.add(row); + + event = new HashMap<>(); + event.put("dimA", "banana"); + event.put("dimB", "ripe"); + event.put("metA", 4L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexE.add(row); + + event = new HashMap<>(); + event.put("dimA", "orange"); + event.put("dimB", "raw"); + event.put("metA", 9L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexE.add(row); + + event = new HashMap<>(); + event.put("dimA", "peach"); + event.put("dimB", "ripe"); + event.put("metA", 7L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexE.add(row); + + event = new HashMap<>(); + event.put("dimA", "orange"); + event.put("dimB", "raw"); + event.put("metA", 2L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexE.add(row); + + event = new HashMap<>(); + event.put("dimA", "strawberry"); + event.put("dimB", "ripe"); + event.put("metA", 10L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexE.add(row); + + final File fileE = INDEX_MERGER_V9.persist( + indexE, + new File(tmpDir, "E"), + new IndexSpec(), + null + ); + QueryableIndex qindexE = INDEX_IO.loadIndex(fileE); + + final IncrementalIndex indexF = makeIncIndex(false, dimensions); + incrementalIndices.add(indexF); + + event = new HashMap<>(); + event.put("dimA", "kiwi"); + event.put("dimB", "raw"); + event.put("metA", 7L); + row = new MapBasedInputRow(1505260800000L, dimNames2, event); + indexF.add(row); + + event = new HashMap<>(); + event.put("dimA", "watermelon"); + event.put("dimB", "ripe"); + event.put("metA", 14L); + row = new MapBasedInputRow(1605260800000L, dimNames2, event); + indexF.add(row); + + event = new HashMap<>(); + event.put("dimA", "kiwi"); + event.put("dimB", "raw"); + event.put("metA", 8L); + row = new MapBasedInputRow(1705264400000L, dimNames2, event); + indexF.add(row); + + event = new HashMap<>(); + event.put("dimA", "kiwi"); + event.put("dimB", "ripe"); + event.put("metA", 8L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexF.add(row); + + event = new HashMap<>(); + event.put("dimA", "lemon"); + event.put("dimB", "raw"); + event.put("metA", 3L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexF.add(row); + + event = new HashMap<>(); + event.put("dimA", "cherry"); + event.put("dimB", "ripe"); + event.put("metA", 2L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexF.add(row); + + event = new HashMap<>(); + event.put("dimA", "cherry"); + event.put("dimB", "raw"); + event.put("metA", 7L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexF.add(row); + + event = new HashMap<>(); + event.put("dimA", "avocado"); + event.put("dimB", "ripe"); + event.put("metA", 12L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexF.add(row); + + event = new HashMap<>(); + event.put("dimA", "cherry"); + event.put("dimB", "raw"); + event.put("metA", 3L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexF.add(row); + + event = new HashMap<>(); + event.put("dimA", "plum"); + event.put("dimB", "ripe"); + event.put("metA", 5L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexF.add(row); + + event = new HashMap<>(); + event.put("dimA", "plum"); + event.put("dimB", "raw"); + event.put("metA", 3L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexF.add(row); + + event = new HashMap<>(); + event.put("dimA", "lime"); + event.put("dimB", "ripe"); + event.put("metA", 7L); + row = new MapBasedInputRow(1805264400000L, dimNames2, event); + indexF.add(row); + + final File fileF = INDEX_MERGER_V9.persist( + indexF, + new File(tmpDir, "F"), + new IndexSpec(), + null + ); + QueryableIndex qindexF = INDEX_IO.loadIndex(fileF); + + groupByIndices = Arrays.asList(qindexA, qindexB, qindexC, qindexD, qindexE, qindexF); resourceCloser = Closer.create(); setupGroupByFactory(); } @@ -704,6 +923,95 @@ public class GroupByLimitPushDownMultiNodeMergeTest Assert.assertEquals(expectedRow3, results.get(3)); } + @Test + public void testForcePushLimitDownAccuracyWhenSortHasNonGroupingFields() + { + // The two testing segments have non overlapping groups, so the result should be 100% accurate even + // forceLimitPushDown is applied + List resultsWithoutLimitPushDown = testForcePushLimitDownAccuracyWhenSortHasNonGroupingFieldsHelper(ImmutableMap.of()); + List resultsWithLimitPushDown = testForcePushLimitDownAccuracyWhenSortHasNonGroupingFieldsHelper(ImmutableMap.of( + GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, true, + GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true + )); + + List expectedResults = ImmutableList.of( + ResultRow.of("mango", "ripe", 16), + ResultRow.of("kiwi", "raw", 15), + ResultRow.of("watermelon", "ripe", 14), + ResultRow.of("avocado", "ripe", 12), + ResultRow.of("orange", "raw", 11) + ); + + Assert.assertEquals(expectedResults.toString(), resultsWithoutLimitPushDown.toString()); + Assert.assertEquals(expectedResults.toString(), resultsWithLimitPushDown.toString()); + } + + private List testForcePushLimitDownAccuracyWhenSortHasNonGroupingFieldsHelper(Map context) + { + QueryToolChest toolChest = groupByFactory.getToolchest(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults( + groupByFactory.mergeRunners(executorService, getRunner1(4)) + ), + (QueryToolChest) toolChest + ); + + QueryRunner theRunner2 = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults( + groupByFactory2.mergeRunners(executorService, getRunner2(5)) + ), + (QueryToolChest) toolChest + ); + + QueryRunner finalRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults( + new QueryRunner() + { + @Override + public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) + { + return Sequences + .simple( + ImmutableList.of( + theRunner.run(queryPlus, responseContext), + theRunner2.run(queryPlus, responseContext) + ) + ) + .flatMerge(Function.identity(), queryPlus.getQuery().getResultOrdering()); + } + } + ), + (QueryToolChest) toolChest + ); + + QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.utc(1500000000000L, 1900000000000L)) + ); + + DefaultLimitSpec ls = new DefaultLimitSpec( + Collections.singletonList( + new OrderByColumnSpec("a0", OrderByColumnSpec.Direction.DESCENDING, StringComparators.NUMERIC) + ), + 5 + ); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource("blah") + .setQuerySegmentSpec(intervalSpec) + .setDimensions( + new DefaultDimensionSpec("dimA", "d0", ValueType.STRING), + new DefaultDimensionSpec("dimB", "d1", ValueType.STRING) + ).setAggregatorSpecs(new LongSumAggregatorFactory("a0", "metA")) + .setLimitSpec(ls) + .setContext(context) + .setGranularity(Granularities.ALL) + .build(); + + Sequence queryResult = finalRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty()); + return queryResult.toList(); + } + private List> getRunner1(int qIndexNumber) { List> runners = new ArrayList<>();