From d5d170f866f9512ed859f643b84fc11ff341fba4 Mon Sep 17 00:00:00 2001 From: Himanshu Date: Tue, 27 Aug 2019 15:01:07 -0700 Subject: [PATCH] skip unnecessary aggregate(..) calls with LimitedBufferHashGrouper (#8412) * skip unnecessary aggregate(..) calls with LimitedBufferHashGrouper * remove unused bucketWasUsed arg from canSkipAggregate(..) --- .../AbstractBufferHashGrouper.java | 5 ++- .../epinephelinae/BufferHashGrouper.java | 2 +- .../LimitedBufferHashGrouper.java | 34 +++++++++---------- 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java index aae2e4b3a3c..0fe22b8f433 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java @@ -76,13 +76,12 @@ public abstract class AbstractBufferHashGrouper implements Grouper implements Grouper extends AbstractBufferHashGrouper extends AbstractBufferHashGrouper public void newBucketHook(int bucketOffset) { heapIndexUpdater.updateHeapIndexForOffset(bucketOffset, -1); + if (!sortHasNonGroupingFields) { + offsetHeap.addOffset(bucketOffset); + } } @Override - public boolean canSkipAggregate(boolean bucketWasUsed, int bucketOffset) + public boolean canSkipAggregate(int bucketOffset) { - if (bucketWasUsed) { - if (!sortHasNonGroupingFields) { - if (heapIndexUpdater.getHeapIndexForOffset(bucketOffset) < 0) { - return true; - } - } - } - return false; + return !sortHasNonGroupingFields && heapIndexUpdater.getHeapIndexForOffset(bucketOffset) < 0; } @Override public void afterAggregateHook(int bucketOffset) { - int heapIndex = heapIndexUpdater.getHeapIndexForOffset(bucketOffset); - if (heapIndex < 0) { - // not in the heap, add it - offsetHeap.addOffset(bucketOffset); - } else if (sortHasNonGroupingFields) { - // Since the sorting columns contain at least one aggregator, we need to remove and reinsert - // the entries after aggregating to maintain proper ordering - offsetHeap.removeAt(heapIndex); - offsetHeap.addOffset(bucketOffset); + if (sortHasNonGroupingFields) { + int heapIndex = heapIndexUpdater.getHeapIndexForOffset(bucketOffset); + + if (heapIndex < 0) { + offsetHeap.addOffset(bucketOffset); + } else { + // Since the sorting columns contain at least one aggregator, we need to remove and reinsert + // the entries after aggregating to maintain proper ordering + offsetHeap.removeAt(heapIndex); + offsetHeap.addOffset(bucketOffset); + } } }