mirror of https://github.com/apache/druid.git
skip unnecessary aggregate(..) calls with LimitedBufferHashGrouper (#8412)
* skip unnecessary aggregate(..) calls with LimitedBufferHashGrouper * remove unused bucketWasUsed arg from canSkipAggregate(..)
This commit is contained in:
parent
4d87a19547
commit
d5d170f866
|
@ -76,13 +76,12 @@ public abstract class AbstractBufferHashGrouper<KeyType> implements Grouper<KeyT
|
|||
/**
|
||||
* Called to check if it's possible to skip aggregation for a row.
|
||||
*
|
||||
* @param bucketWasUsed Was the row a new entry in the hash table?
|
||||
* @param bucketOffset Offset of the bucket containing this row's entry in the hash table,
|
||||
* within the buffer returned by hashTable.getTableBuffer()
|
||||
*
|
||||
* @return true if aggregation can be skipped, false otherwise.
|
||||
*/
|
||||
public abstract boolean canSkipAggregate(boolean bucketWasUsed, int bucketOffset);
|
||||
public abstract boolean canSkipAggregate(int bucketOffset);
|
||||
|
||||
/**
|
||||
* Called after a row is aggregated. An implementing BufferHashGrouper class can use this to update
|
||||
|
@ -154,7 +153,7 @@ public abstract class AbstractBufferHashGrouper<KeyType> implements Grouper<KeyT
|
|||
newBucketHook(bucketStartOffset);
|
||||
}
|
||||
|
||||
if (canSkipAggregate(bucketWasUsed, bucketStartOffset)) {
|
||||
if (canSkipAggregate(bucketStartOffset)) {
|
||||
return AggregateResult.ok();
|
||||
}
|
||||
|
||||
|
|
|
@ -244,7 +244,7 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean canSkipAggregate(boolean bucketWasUsed, int bucketOffset)
|
||||
public boolean canSkipAggregate(int bucketOffset)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -146,35 +146,33 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
|
|||
public void newBucketHook(int bucketOffset)
|
||||
{
|
||||
heapIndexUpdater.updateHeapIndexForOffset(bucketOffset, -1);
|
||||
if (!sortHasNonGroupingFields) {
|
||||
offsetHeap.addOffset(bucketOffset);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canSkipAggregate(boolean bucketWasUsed, int bucketOffset)
|
||||
public boolean canSkipAggregate(int bucketOffset)
|
||||
{
|
||||
if (bucketWasUsed) {
|
||||
if (!sortHasNonGroupingFields) {
|
||||
if (heapIndexUpdater.getHeapIndexForOffset(bucketOffset) < 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return !sortHasNonGroupingFields && heapIndexUpdater.getHeapIndexForOffset(bucketOffset) < 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterAggregateHook(int bucketOffset)
|
||||
{
|
||||
if (sortHasNonGroupingFields) {
|
||||
int heapIndex = heapIndexUpdater.getHeapIndexForOffset(bucketOffset);
|
||||
|
||||
if (heapIndex < 0) {
|
||||
// not in the heap, add it
|
||||
offsetHeap.addOffset(bucketOffset);
|
||||
} else if (sortHasNonGroupingFields) {
|
||||
} else {
|
||||
// Since the sorting columns contain at least one aggregator, we need to remove and reinsert
|
||||
// the entries after aggregating to maintain proper ordering
|
||||
offsetHeap.removeAt(heapIndex);
|
||||
offsetHeap.addOffset(bucketOffset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset()
|
||||
|
|
Loading…
Reference in New Issue