Improve on-heap aggregator footprint estimates. (#11950)

Add a "guessAggregatorHeapFootprint" method to AggregatorFactory that
mitigates #6743 by enabling heap footprint estimates based on a specific
number of rows. The idea is that at ingestion time, the number of rows
that go into an aggregator will be 1 (if rollup is off) or will likely
be a small number (if rollup is on).

It's a heuristic, because of course nothing guarantees that the rollup
ratio is a small number. But it's a common case, and I expect this logic
to go wrong much less often than the current logic. Also, when it does
go wrong, users can fix it by lowering maxRowsInMemory or
maxBytesInMemory. The current situation is unintuitive: when the
estimation goes wrong, users get an OOME, but actually they need to
*raise* these limits to fix it.
This commit is contained in:
Gian Merlino 2021-11-27 23:51:24 -08:00 committed by GitHub
parent 8eff6334f7
commit 93aeaf4801
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 106 additions and 5 deletions

View File

@ -298,6 +298,12 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
return Collections.singletonList(fieldName); return Collections.singletonList(fieldName);
} }
@Override
public int guessAggregatorHeapFootprint(long rows)
{
return DoublesSketch.getUpdatableStorageBytes(k, rows);
}
// Quantiles sketches never stop growing, but they do so very slowly. // Quantiles sketches never stop growing, but they do so very slowly.
// This size must suffice for overwhelming majority of sketches, // This size must suffice for overwhelming majority of sketches,
// but some sketches may request more memory on heap and move there // but some sketches may request more memory on heap and move there

View File

@ -21,6 +21,7 @@ package org.apache.druid.query.aggregation.datasketches.theta;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.apache.datasketches.Family; import org.apache.datasketches.Family;
import org.apache.datasketches.Util; import org.apache.datasketches.Util;
import org.apache.datasketches.theta.SetOperation; import org.apache.datasketches.theta.SetOperation;
@ -48,6 +49,13 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
{ {
public static final int DEFAULT_MAX_SKETCH_SIZE = 16384; public static final int DEFAULT_MAX_SKETCH_SIZE = 16384;
// Smallest number of entries in an Aggregator. Each entry is a long. Based on the constructor of
// HeapQuickSelectSketch and used by guessAggregatorHeapFootprint.
private static final int MIN_ENTRIES_PER_AGGREGATOR = 1 << Util.MIN_LG_ARR_LONGS;
// Largest preamble size for the sketch stored in an Aggregator, in bytes. Based on Util.getMaxUnionBytes.
private static final int LONGEST_POSSIBLE_PREAMBLE_BYTES = Family.UNION.getMaxPreLongs() << 3;
protected final String name; protected final String name;
protected final String fieldName; protected final String fieldName;
protected final int size; protected final int size;
@ -170,6 +178,23 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
return size; return size;
} }
@Override
public int guessAggregatorHeapFootprint(long rows)
{
final int maxEntries = size * 2;
final int expectedEntries;
if (rows > maxEntries) {
expectedEntries = maxEntries;
} else {
// rows is within int range since it's <= maxEntries, so casting is OK.
expectedEntries = Math.max(MIN_ENTRIES_PER_AGGREGATOR, Util.ceilingPowerOf2(Ints.checkedCast(rows)));
}
// 8 bytes per entry + largest possible preamble.
return Long.BYTES * expectedEntries + LONGEST_POSSIBLE_PREAMBLE_BYTES;
}
@Override @Override
public int getMaxIntermediateSize() public int getMaxIntermediateSize()
{ {

View File

@ -83,6 +83,21 @@ public class DoublesSketchAggregatorFactoryTest
Assert.assertEquals(DoublesSketchAggregatorFactory.DEFAULT_MAX_STREAM_LENGTH, factory.getMaxStreamLength()); Assert.assertEquals(DoublesSketchAggregatorFactory.DEFAULT_MAX_STREAM_LENGTH, factory.getMaxStreamLength());
} }
@Test
public void testGuessAggregatorHeapFootprint()
{
DoublesSketchAggregatorFactory factory = new DoublesSketchAggregatorFactory(
"myFactory",
"myField",
128,
null
);
Assert.assertEquals(64, factory.guessAggregatorHeapFootprint(1));
Assert.assertEquals(1056, factory.guessAggregatorHeapFootprint(100));
Assert.assertEquals(4128, factory.guessAggregatorHeapFootprint(1000));
Assert.assertEquals(34848, factory.guessAggregatorHeapFootprint(1_000_000_000_000L));
}
@Test @Test
public void testMaxIntermediateSize() public void testMaxIntermediateSize()
{ {

View File

@ -36,6 +36,31 @@ import org.junit.Test;
public class SketchAggregatorFactoryTest public class SketchAggregatorFactoryTest
{ {
private static final SketchMergeAggregatorFactory AGGREGATOR_16384 =
new SketchMergeAggregatorFactory("x", "x", 16384, null, false, null);
private static final SketchMergeAggregatorFactory AGGREGATOR_32768 =
new SketchMergeAggregatorFactory("x", "x", 32768, null, false, null);
@Test
public void testGuessAggregatorHeapFootprint()
{
Assert.assertEquals(288, AGGREGATOR_16384.guessAggregatorHeapFootprint(1));
Assert.assertEquals(1056, AGGREGATOR_16384.guessAggregatorHeapFootprint(100));
Assert.assertEquals(262176, AGGREGATOR_16384.guessAggregatorHeapFootprint(1_000_000_000_000L));
Assert.assertEquals(288, AGGREGATOR_32768.guessAggregatorHeapFootprint(1));
Assert.assertEquals(1056, AGGREGATOR_32768.guessAggregatorHeapFootprint(100));
Assert.assertEquals(524320, AGGREGATOR_32768.guessAggregatorHeapFootprint(1_000_000_000_000L));
}
@Test
public void testMaxIntermediateSize()
{
Assert.assertEquals(262176, AGGREGATOR_16384.getMaxIntermediateSize());
Assert.assertEquals(524320, AGGREGATOR_32768.getMaxIntermediateSize());
}
@Test @Test
public void testResultArraySignature() public void testResultArraySignature()
{ {

View File

@ -302,6 +302,23 @@ public abstract class AggregatorFactory implements Cacheable
return getMaxIntermediateSize(); return getMaxIntermediateSize();
} }
/**
* Returns a best guess as to how much memory the on-heap {@link Aggregator} returned by {@link #factorize} will
* require when a certain number of rows have been aggregated into it.
*
* The main user of this method is {@link org.apache.druid.segment.incremental.OnheapIncrementalIndex}, which
* uses it to determine when to persist the current in-memory data to disk.
*
* Important note for callers! In nearly all cases, callers that wish to constrain memory would be better off
* using {@link #factorizeBuffered} or {@link #factorizeVector}, which offer precise control over how much memory
* is being used.
*/
public int guessAggregatorHeapFootprint(long rows)
{
// By default, guess that on-heap footprint is equal to off-heap footprint.
return getMaxIntermediateSizeWithNulls();
}
/** /**
* Return a potentially optimized form of this AggregatorFactory for per-segment queries. * Return a potentially optimized form of this AggregatorFactory for per-segment queries.
*/ */

View File

@ -44,7 +44,6 @@ import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -59,6 +58,15 @@ import java.util.concurrent.atomic.AtomicLong;
public class OnheapIncrementalIndex extends IncrementalIndex public class OnheapIncrementalIndex extends IncrementalIndex
{ {
private static final Logger log = new Logger(OnheapIncrementalIndex.class); private static final Logger log = new Logger(OnheapIncrementalIndex.class);
/**
* Constant factor provided to {@link AggregatorFactory#guessAggregatorHeapFootprint(long)} for footprint estimates.
* This figure is large enough to catch most common rollup ratios, but not so large that it will cause persists to
* happen too often. If an actual workload involves a much higher rollup ratio, then this may lead to excessive
* heap usage. Users would have to work around that by lowering maxRowsInMemory or maxBytesInMemory.
*/
private static final long ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION = 100;
/** /**
* overhead per {@link ConcurrentHashMap.Node} or {@link java.util.concurrent.ConcurrentSkipListMap.Node} object * overhead per {@link ConcurrentHashMap.Node} or {@link java.util.concurrent.ConcurrentSkipListMap.Node} object
*/ */
@ -113,11 +121,16 @@ public class OnheapIncrementalIndex extends IncrementalIndex
*/ */
private static long getMaxBytesPerRowForAggregators(IncrementalIndexSchema incrementalIndexSchema) private static long getMaxBytesPerRowForAggregators(IncrementalIndexSchema incrementalIndexSchema)
{ {
final long rowsPerAggregator =
incrementalIndexSchema.isRollup() ? ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION : 1;
long maxAggregatorIntermediateSize = ((long) Integer.BYTES) * incrementalIndexSchema.getMetrics().length; long maxAggregatorIntermediateSize = ((long) Integer.BYTES) * incrementalIndexSchema.getMetrics().length;
maxAggregatorIntermediateSize += Arrays.stream(incrementalIndexSchema.getMetrics())
.mapToLong(aggregator -> aggregator.getMaxIntermediateSizeWithNulls() for (final AggregatorFactory aggregator : incrementalIndexSchema.getMetrics()) {
+ Long.BYTES * 2L) maxAggregatorIntermediateSize +=
.sum(); (long) aggregator.guessAggregatorHeapFootprint(rowsPerAggregator) + 2L * Long.BYTES;
}
return maxAggregatorIntermediateSize; return maxAggregatorIntermediateSize;
} }