From 274ccbfd85c86a1333344c4ac2aedce241027728 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 24 Apr 2024 02:39:24 -0700 Subject: [PATCH] Reset buffer aggregators when resetting Groupers. (#16296) Buffer aggregators can contain some cached objects within them, such as Memory references or HLL Unions. Prior to this patch, various Grouper implementations were not releasing this state when resetting their own internal state, which could lead to excessive memory use. This patch renames AggregatorAdapater#close to "reset", and updates Grouper implementations to call this reset method whenever they reset their internal state. The base method on BufferAggregator and VectorAggregator remains named "close", for compatibility with existing extensions, but the contract is adjusted to say that the aggregator may be reused after the method is called. All existing implementations in core already adhere to this new contract, except for the ArrayOfDoubles build flavors, which are updated in this patch to adhere. Additionally, this patch harmonizes buffer sketch helpers to call their clear method "clear" rather than a mix of "clear" and "close". (Others were already using "clear".) --- .../hll/HllSketchMergeBufferAggregator.java | 2 +- .../HllSketchMergeBufferAggregatorHelper.java | 2 +- .../hll/HllSketchMergeVectorAggregator.java | 2 +- .../theta/SketchBufferAggregator.java | 2 +- .../theta/SketchBufferAggregatorHelper.java | 4 ++-- .../theta/SketchVectorAggregator.java | 2 +- .../ArrayOfDoublesSketchBuildAggregator.java | 16 ++++++++++----- ...yOfDoublesSketchBuildBufferAggregator.java | 6 ++++-- .../query/aggregation/AggregatorAdapters.java | 20 +++++++++---------- .../query/aggregation/BufferAggregator.java | 6 +++++- .../query/aggregation/VectorAggregator.java | 6 +++++- .../AbstractBufferHashGrouper.java | 2 +- .../epinephelinae/BufferArrayGrouper.java | 3 ++- .../epinephelinae/BufferHashGrouper.java | 1 + .../epinephelinae/HashVectorGrouper.java | 3 ++- .../LimitedBufferHashGrouper.java | 1 + .../timeseries/TimeseriesQueryEngine.java | 6 +++--- .../druid/query/topn/BaseTopNAlgorithm.java | 6 +++--- .../query/topn/HeapBasedTopNAlgorithm.java | 2 +- .../druid/query/topn/PooledTopNAlgorithm.java | 2 +- .../topn/TimeExtractionTopNAlgorithm.java | 2 +- .../epinephelinae/HashVectorGrouperTest.java | 2 +- 22 files changed, 58 insertions(+), 40 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java index 8e3bfffa073..0458b5084c4 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java @@ -75,7 +75,7 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator @Override public void close() { - helper.close(); + helper.clear(); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java index 22653019772..1fa9ee4c9a3 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java @@ -142,7 +142,7 @@ public class HllSketchMergeBufferAggregatorHelper } } - public void close() + public void clear() { unions.clear(); memCache.clear(); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java index 5fec9b94ba2..31ad26cb5d7 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java @@ -102,7 +102,7 @@ public class HllSketchMergeVectorAggregator implements VectorAggregator @Override public void close() { - helper.close(); + helper.clear(); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index 34aae3f36e1..60d83f4e0a7 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -85,7 +85,7 @@ public class SketchBufferAggregator implements BufferAggregator @Override public void close() { - helper.close(); + helper.clear(); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java index 49856c9e80d..e2f699012a1 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java @@ -95,7 +95,7 @@ final class SketchBufferAggregatorHelper /** * Returns a {@link Union} associated with a particular buffer location. * - * The Union object will be cached in this helper until {@link #close()} is called. + * The Union object will be cached in this helper until {@link #clear()} is called. */ public Union getOrCreateUnion(ByteBuffer buf, int position) { @@ -122,7 +122,7 @@ final class SketchBufferAggregatorHelper return union; } - public void close() + public void clear() { unions.clear(); memCache.clear(); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java index a862265d561..7d10bc30fb5 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java @@ -107,6 +107,6 @@ public class SketchVectorAggregator implements VectorAggregator @Override public void close() { - helper.close(); + helper.clear(); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java index 7ca1061889d..b093e730f0b 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java @@ -27,7 +27,6 @@ import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.data.IndexedInts; import javax.annotation.Nullable; - import java.nio.ByteBuffer; import java.util.LinkedHashMap; import java.util.List; @@ -48,6 +47,7 @@ public class ArrayOfDoublesSketchBuildAggregator implements Aggregator @Nullable private ArrayOfDoublesUpdatableSketch sketch; + private final int nominalEntries; private final boolean canLookupUtf8; private final boolean canCacheById; private final LinkedHashMap stringCache = new LinkedHashMap() @@ -67,10 +67,7 @@ public class ArrayOfDoublesSketchBuildAggregator implements Aggregator { this.keySelector = keySelector; this.valueSelectors = valueSelectors.toArray(new BaseDoubleColumnValueSelector[0]); - values = new double[valueSelectors.size()]; - sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries) - .setNumberOfValues(valueSelectors.size()).build(); - + this.nominalEntries = nominalEntries; this.canCacheById = this.keySelector.nameLookupPossibleInAdvance(); this.canLookupUtf8 = this.keySelector.supportsLookupNameUtf8(); } @@ -83,6 +80,15 @@ public class ArrayOfDoublesSketchBuildAggregator implements Aggregator @Override public void aggregate() { + if (values == null) { + values = new double[valueSelectors.length]; + } + + if (sketch == null) { + sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries) + .setNumberOfValues(valueSelectors.length).build(); + } + final IndexedInts keys = keySelector.getRow(); for (int i = 0; i < valueSelectors.length; i++) { if (valueSelectors[i].isNull()) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java index 18906d12936..b925220c89f 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java @@ -73,8 +73,6 @@ public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregat this.valueSelectors = valueSelectors.toArray(new BaseDoubleColumnValueSelector[0]); this.nominalEntries = nominalEntries; this.maxIntermediateSize = maxIntermediateSize; - values = new double[valueSelectors.size()]; - this.canCacheById = this.keySelector.nameLookupPossibleInAdvance(); this.canLookupUtf8 = this.keySelector.supportsLookupNameUtf8(); } @@ -92,6 +90,10 @@ public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregat @Override public void aggregate(final ByteBuffer buf, final int position) { + if (values == null) { + values = new double[valueSelectors.length]; + } + for (int i = 0; i < valueSelectors.length; i++) { if (valueSelectors[i].isNull()) { return; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java index 8ae7a33b08d..25c9102bcf7 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java @@ -26,7 +26,6 @@ import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; -import java.io.Closeable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; @@ -42,7 +41,7 @@ import java.util.stream.Collectors; * (2) Query engines are freed from the need to manage how much space each individual aggregator needs. They only * need to allocate a block of size "spaceNeeded". */ -public class AggregatorAdapters implements Closeable +public class AggregatorAdapters { private static final Logger log = new Logger(AggregatorAdapters.class); @@ -230,14 +229,14 @@ public class AggregatorAdapters implements Closeable } /** - * Close all of our aggregators. + * Reset all of our aggregators, releasing resources held by them. After this, this instance may be reused or + * it may be discarded. */ - @Override - public void close() + public void reset() { for (Adapter adapter : adapters) { try { - adapter.close(); + adapter.reset(); } catch (Exception e) { log.warn(e, "Could not close aggregator [%s], skipping.", adapter.getFactory().getName()); @@ -250,7 +249,7 @@ public class AggregatorAdapters implements Closeable * BufferAggregator and VectorAggregator. Private, since it doesn't escape this class and the * only two implementations are private static classes below. */ - private interface Adapter extends Closeable + private interface Adapter { void init(ByteBuffer buf, int position); @@ -259,8 +258,7 @@ public class AggregatorAdapters implements Closeable void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer); - @Override - void close(); + void reset(); AggregatorFactory getFactory(); @@ -293,7 +291,7 @@ public class AggregatorAdapters implements Closeable } @Override - public void close() + public void reset() { aggregator.close(); } @@ -352,7 +350,7 @@ public class AggregatorAdapters implements Closeable } @Override - public void close() + public void reset() { aggregator.close(); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java index e9fdbeaa061..20d13491b0f 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java @@ -158,7 +158,11 @@ public interface BufferAggregator extends HotLoopCallee } /** - * Release any resources used by the aggregator + * Release any resources used by the aggregator. The aggregator may be reused after this call, by calling + * {@link #init(ByteBuffer, int)} followed by other methods as normal. + * + * This call would be more properly named "reset", but we use the name "close" to improve compatibility with + * existing aggregator implementations in extensions. */ void close(); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java index befff12ba6e..a3e506e59c8 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java @@ -83,7 +83,11 @@ public interface VectorAggregator } /** - * Release any resources used by the aggregator. + * Release any resources used by the aggregator. The aggregator may be reused after this call, by calling + * {@link #init(ByteBuffer, int)} followed by other methods as normal. + * + * This call would be more properly named "reset", but we use the name "close" to improve compatibility with + * existing aggregator implementations in extensions. */ void close(); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java index f3bc195dcbd..70cf5832cf3 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java @@ -170,7 +170,7 @@ public abstract class AbstractBufferHashGrouper implements Grouper extends AbstractBufferHashGrouper extends AbstractBufferHashGrouper hashTable.reset(); keySerde.reset(); offsetHeap.reset(); + aggregators.reset(); heapIndexUpdater.setHashTableBuffer(hashTable.getTableBuffer()); hasIterated = false; offsetHeapIterableSize = 0; diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java index 7ae290dd7d4..c5e83b84e87 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java @@ -164,9 +164,9 @@ public class TimeseriesQueryEngine } final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); - final AggregatorAdapters aggregators = closer.register( - AggregatorAdapters.factorizeVector(columnSelectorFactory, query.getAggregatorSpecs()) - ); + final AggregatorAdapters aggregators = + AggregatorAdapters.factorizeVector(columnSelectorFactory, query.getAggregatorSpecs()); + closer.register(aggregators::reset); final ResourceHolder bufferHolder = closer.register(bufferPool.take()); diff --git a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java index 843d248221e..f34464a49d0 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java @@ -120,7 +120,7 @@ public abstract class BaseTopNAlgorithm stringMap) + protected void resetAggregators(Map stringMap) { for (Aggregator[] aggregators : stringMap.values()) { for (Aggregator agg : aggregators) { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java index d5a863a7542..fd0314a607c 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java @@ -45,7 +45,7 @@ public class HashVectorGrouperTest ); grouper.initVectorized(512); grouper.close(); - Mockito.verify(aggregatorAdapters, Mockito.times(1)).close(); + Mockito.verify(aggregatorAdapters, Mockito.times(2)).reset(); } @Test