diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java index 4f7f4cdcd04..145ffcf2e85 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java @@ -298,6 +298,12 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory return Collections.singletonList(fieldName); } + @Override + public int guessAggregatorHeapFootprint(long rows) + { + return DoublesSketch.getUpdatableStorageBytes(k, rows); + } + // Quantiles sketches never stop growing, but they do so very slowly. // This size must suffice for overwhelming majority of sketches, // but some sketches may request more memory on heap and move there diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index 7a10a169bbb..a57c547cd1b 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -21,6 +21,7 @@ package org.apache.druid.query.aggregation.datasketches.theta; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; import org.apache.datasketches.Family; import org.apache.datasketches.Util; import org.apache.datasketches.theta.SetOperation; @@ -48,6 +49,13 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory { public static final int DEFAULT_MAX_SKETCH_SIZE = 16384; + // Smallest number of entries in an Aggregator. Each entry is a long. Based on the constructor of + // HeapQuickSelectSketch and used by guessAggregatorHeapFootprint. + private static final int MIN_ENTRIES_PER_AGGREGATOR = 1 << Util.MIN_LG_ARR_LONGS; + + // Largest preamble size for the sketch stored in an Aggregator, in bytes. Based on Util.getMaxUnionBytes. + private static final int LONGEST_POSSIBLE_PREAMBLE_BYTES = Family.UNION.getMaxPreLongs() << 3; + protected final String name; protected final String fieldName; protected final int size; @@ -170,6 +178,23 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory return size; } + @Override + public int guessAggregatorHeapFootprint(long rows) + { + final int maxEntries = size * 2; + final int expectedEntries; + + if (rows > maxEntries) { + expectedEntries = maxEntries; + } else { + // rows is within int range since it's <= maxEntries, so casting is OK. + expectedEntries = Math.max(MIN_ENTRIES_PER_AGGREGATOR, Util.ceilingPowerOf2(Ints.checkedCast(rows))); + } + + // 8 bytes per entry + largest possible preamble. + return Long.BYTES * expectedEntries + LONGEST_POSSIBLE_PREAMBLE_BYTES; + } + @Override public int getMaxIntermediateSize() { diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java index cf1f4f95e80..d4867c7a19d 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java @@ -83,6 +83,21 @@ public class DoublesSketchAggregatorFactoryTest Assert.assertEquals(DoublesSketchAggregatorFactory.DEFAULT_MAX_STREAM_LENGTH, factory.getMaxStreamLength()); } + @Test + public void testGuessAggregatorHeapFootprint() + { + DoublesSketchAggregatorFactory factory = new DoublesSketchAggregatorFactory( + "myFactory", + "myField", + 128, + null + ); + Assert.assertEquals(64, factory.guessAggregatorHeapFootprint(1)); + Assert.assertEquals(1056, factory.guessAggregatorHeapFootprint(100)); + Assert.assertEquals(4128, factory.guessAggregatorHeapFootprint(1000)); + Assert.assertEquals(34848, factory.guessAggregatorHeapFootprint(1_000_000_000_000L)); + } + @Test public void testMaxIntermediateSize() { diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java index bd495ce0d66..61efd5fb665 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java @@ -36,6 +36,31 @@ import org.junit.Test; public class SketchAggregatorFactoryTest { + private static final SketchMergeAggregatorFactory AGGREGATOR_16384 = + new SketchMergeAggregatorFactory("x", "x", 16384, null, false, null); + + private static final SketchMergeAggregatorFactory AGGREGATOR_32768 = + new SketchMergeAggregatorFactory("x", "x", 32768, null, false, null); + + @Test + public void testGuessAggregatorHeapFootprint() + { + Assert.assertEquals(288, AGGREGATOR_16384.guessAggregatorHeapFootprint(1)); + Assert.assertEquals(1056, AGGREGATOR_16384.guessAggregatorHeapFootprint(100)); + Assert.assertEquals(262176, AGGREGATOR_16384.guessAggregatorHeapFootprint(1_000_000_000_000L)); + + Assert.assertEquals(288, AGGREGATOR_32768.guessAggregatorHeapFootprint(1)); + Assert.assertEquals(1056, AGGREGATOR_32768.guessAggregatorHeapFootprint(100)); + Assert.assertEquals(524320, AGGREGATOR_32768.guessAggregatorHeapFootprint(1_000_000_000_000L)); + } + + @Test + public void testMaxIntermediateSize() + { + Assert.assertEquals(262176, AGGREGATOR_16384.getMaxIntermediateSize()); + Assert.assertEquals(524320, AGGREGATOR_32768.getMaxIntermediateSize()); + } + @Test public void testResultArraySignature() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java index f0e383740c8..45f9328532c 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java @@ -302,6 +302,23 @@ public abstract class AggregatorFactory implements Cacheable return getMaxIntermediateSize(); } + /** + * Returns a best guess as to how much memory the on-heap {@link Aggregator} returned by {@link #factorize} will + * require when a certain number of rows have been aggregated into it. + * + * The main user of this method is {@link org.apache.druid.segment.incremental.OnheapIncrementalIndex}, which + * uses it to determine when to persist the current in-memory data to disk. + * + * Important note for callers! In nearly all cases, callers that wish to constrain memory would be better off + * using {@link #factorizeBuffered} or {@link #factorizeVector}, which offer precise control over how much memory + * is being used. + */ + public int guessAggregatorHeapFootprint(long rows) + { + // By default, guess that on-heap footprint is equal to off-heap footprint. + return getMaxIntermediateSizeWithNulls(); + } + /** * Return a potentially optimized form of this AggregatorFactory for per-segment queries. */ diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java index 8ebbeaf9a67..bb24ae4cadd 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java @@ -44,7 +44,6 @@ import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,6 +58,15 @@ import java.util.concurrent.atomic.AtomicLong; public class OnheapIncrementalIndex extends IncrementalIndex { private static final Logger log = new Logger(OnheapIncrementalIndex.class); + + /** + * Constant factor provided to {@link AggregatorFactory#guessAggregatorHeapFootprint(long)} for footprint estimates. + * This figure is large enough to catch most common rollup ratios, but not so large that it will cause persists to + * happen too often. If an actual workload involves a much higher rollup ratio, then this may lead to excessive + * heap usage. Users would have to work around that by lowering maxRowsInMemory or maxBytesInMemory. + */ + private static final long ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION = 100; + /** * overhead per {@link ConcurrentHashMap.Node} or {@link java.util.concurrent.ConcurrentSkipListMap.Node} object */ @@ -113,11 +121,16 @@ public class OnheapIncrementalIndex extends IncrementalIndex */ private static long getMaxBytesPerRowForAggregators(IncrementalIndexSchema incrementalIndexSchema) { + final long rowsPerAggregator = + incrementalIndexSchema.isRollup() ? ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION : 1; + long maxAggregatorIntermediateSize = ((long) Integer.BYTES) * incrementalIndexSchema.getMetrics().length; - maxAggregatorIntermediateSize += Arrays.stream(incrementalIndexSchema.getMetrics()) - .mapToLong(aggregator -> aggregator.getMaxIntermediateSizeWithNulls() - + Long.BYTES * 2L) - .sum(); + + for (final AggregatorFactory aggregator : incrementalIndexSchema.getMetrics()) { + maxAggregatorIntermediateSize += + (long) aggregator.guessAggregatorHeapFootprint(rowsPerAggregator) + 2L * Long.BYTES; + } + return maxAggregatorIntermediateSize; }