Use lookup memory footprint in MSQ memory computations. (#13271)

* Use lookup memory footprint in MSQ memory computations.

Two main changes:

1) Add estimateHeapFootprint to LookupExtractor.

2) Use this in MSQ's IndexerWorkerContext when determining the total
   amount of available memory. It's taken off the top.

This prevents MSQ tasks from running out of memory when there are lookups
defined in the cluster.

* Updates from code review.
This commit is contained in:
Gian Merlino 2022-11-03 07:36:54 -07:00 committed by GitHub
parent c5fcc03bdf
commit d1877e41ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 218 additions and 35 deletions

View File

@ -253,8 +253,9 @@ Increasing the amount of available memory can improve performance in certain cas
Worker tasks use both JVM heap memory and off-heap ("direct") memory.
On Peons launched by Middle Managers, the bulk of the JVM heap (75%) is split up into two bundles of equal size: one
processor bundle and one worker bundle. Each one comprises 37.5% of the available JVM heap.
On Peons launched by Middle Managers, the bulk of the JVM heap (75%, less any space used by
[lookups](../querying/lookups.md)) is split up into two bundles of equal size: one processor bundle and one worker
bundle. Each one comprises 37.5% of the available JVM heap, less any space used by [lookups](../querying/lookups.md).
Depending on the type of query, each worker and controller task can use a sketch for generating partition boundaries.
Each sketch uses at most approximately 300 MB.

View File

@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.server.lookup.namespace.NamespaceExtractionConfig;
import java.lang.ref.WeakReference;
@ -132,26 +133,17 @@ public class OnHeapNamespaceExtractionCacheManager extends NamespaceExtractionCa
void monitor(ServiceEmitter serviceEmitter)
{
long numEntries = 0;
long size = 0;
long heapSizeInBytes = 0;
expungeCollectedCaches();
for (WeakReference<Map<String, String>> cacheRef : caches) {
final Map<String, String> cache = cacheRef.get();
if (cache == null) {
continue;
}
if (cache != null) {
numEntries += cache.size();
for (Map.Entry<String, String> sEntry : cache.entrySet()) {
final String key = sEntry.getKey();
final String value = sEntry.getValue();
if (key == null || value == null) {
LOG.debug("Missing entries for cache key");
continue;
}
size += key.length() + value.length();
heapSizeInBytes += MapLookupExtractor.estimateHeapFootprint(cache);
}
}
// Each String object has ~40 bytes of overhead, and x 2 for key and value strings
long heapSizeInBytes = (80 * numEntries) + size * Character.BYTES;
serviceEmitter.emit(ServiceMetricEvent.builder().build("namespace/cache/numEntries", numEntries));
serviceEmitter.emit(ServiceMetricEvent.builder().build("namespace/cache/heapSizeInBytes", heapSizeInBytes));
}

View File

@ -215,6 +215,26 @@ public class PollingLookup extends LookupExtractor
};
}
@Override
public long estimateHeapFootprint()
{
PollingCache<?, ?> cache = null;
while (cache == null) {
final CacheRefKeeper cacheRefKeeper = refOfCacheKeeper.get();
if (cacheRefKeeper == null) {
// Closed.
return 0;
}
// If null, we'll do another run through the while loop.
cache = cacheRefKeeper.getAndIncrementRef();
}
return cache.estimateHeapFootprint();
}
@Override
public int hashCode()
{

View File

@ -22,6 +22,7 @@ package org.apache.druid.server.lookup.cache.polling;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.druid.query.extraction.MapLookupExtractor;
import java.util.Collections;
import java.util.List;
@ -83,6 +84,19 @@ public class OnHeapPollingCache<K, V> implements PollingCache<K, V>
return listOfKeys;
}
@Override
@SuppressWarnings("unchecked")
public long estimateHeapFootprint()
{
for (final Map.Entry<K, V> entry : immutableMap.entrySet()) {
if (!(entry.getKey() instanceof String) || !(entry.getValue() instanceof String)) {
return 0;
}
}
return MapLookupExtractor.estimateHeapFootprint((Map<String, String>) immutableMap);
}
@Override
public void close()
{

View File

@ -41,4 +41,12 @@ public interface PollingCache<K, V>
* close and clean the resources used by the cache
*/
void close();
/**
* Estimated heap footprint of this object.
*/
default long estimateHeapFootprint()
{
return 0;
}
}

View File

@ -223,6 +223,15 @@ public class PollingLookupTest extends InitializedNullHandlingTest
pollingLookup.keySet();
}
@Test
public void testEstimateHeapFootprint()
{
Assert.assertEquals(
pollingCacheFactory instanceof OffHeapPollingCache.OffHeapPollingCacheProvider ? 0L : 402L,
pollingLookup.estimateHeapFootprint()
);
}
private void assertMapLookup(Map<String, String> map, LookupExtractor lookup)
{
for (Map.Entry<String, String> entry : map.entrySet()) {

View File

@ -44,6 +44,9 @@ import org.apache.druid.msq.input.InputSpecs;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.rpc.CoordinatorServiceClient;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupReferencesManager;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocations;
import org.apache.druid.rpc.ServiceLocator;
@ -71,6 +74,7 @@ public class IndexerWorkerContext implements WorkerContext
private final IndexIO indexIO;
private final TaskDataSegmentProvider dataSegmentProvider;
private final ServiceClientFactory clientFactory;
private final long availableHeapMemory;
@GuardedBy("this")
private OverlordClient overlordClient;
@ -83,7 +87,8 @@ public class IndexerWorkerContext implements WorkerContext
final Injector injector,
final IndexIO indexIO,
final TaskDataSegmentProvider dataSegmentProvider,
final ServiceClientFactory clientFactory
final ServiceClientFactory clientFactory,
final long availableHeapMemory
)
{
this.toolbox = toolbox;
@ -91,6 +96,7 @@ public class IndexerWorkerContext implements WorkerContext
this.indexIO = indexIO;
this.dataSegmentProvider = dataSegmentProvider;
this.clientFactory = clientFactory;
this.availableHeapMemory = availableHeapMemory;
}
public static IndexerWorkerContext createProductionInstance(final TaskToolbox toolbox, final Injector injector)
@ -109,7 +115,8 @@ public class IndexerWorkerContext implements WorkerContext
injector,
indexIO,
new TaskDataSegmentProvider(coordinatorServiceClient, segmentCacheManager, indexIO),
serviceClientFactory
serviceClientFactory,
computeAvailableHeapMemory(injector)
);
}
@ -227,17 +234,6 @@ public class IndexerWorkerContext implements WorkerContext
@Override
public FrameContext frameContext(QueryDefinition queryDef, int stageNumber)
{
final int numWorkersInJvm;
// Determine the max number of workers in JVM for memory allocations.
if (toolbox.getAppenderatorsManager() instanceof UnifiedIndexerAppenderatorsManager) {
// CliIndexer
numWorkersInJvm = injector.getInstance(WorkerConfig.class).getCapacity();
} else {
// CliPeon
numWorkersInJvm = 1;
}
final IntSet inputStageNumbers =
InputSpecs.getStageNumbers(queryDef.getStageDefinition(stageNumber).getInputSpecs());
final int numInputWorkers =
@ -250,8 +246,8 @@ public class IndexerWorkerContext implements WorkerContext
indexIO,
dataSegmentProvider,
WorkerMemoryParameters.compute(
Runtime.getRuntime().maxMemory(),
numWorkersInJvm,
availableHeapMemory,
computeNumWorkersInJvm(),
processorBouncer().getMaxCount(),
numInputWorkers
)
@ -276,6 +272,20 @@ public class IndexerWorkerContext implements WorkerContext
return injector.getInstance(Bouncer.class);
}
/**
* Number of workers that may run in the current JVM, including the current worker.
*/
private int computeNumWorkersInJvm()
{
if (toolbox.getAppenderatorsManager() instanceof UnifiedIndexerAppenderatorsManager) {
// CliIndexer
return injector.getInstance(WorkerConfig.class).getCapacity();
} else {
// CliPeon
return 1;
}
}
private synchronized OverlordClient makeOverlordClient()
{
if (overlordClient == null) {
@ -293,4 +303,46 @@ public class IndexerWorkerContext implements WorkerContext
return controllerLocator;
}
/**
* Amount of memory available for our usage.
*/
private static long computeAvailableHeapMemory(final Injector injector)
{
return Runtime.getRuntime().maxMemory() - computeTotalLookupFootprint(injector);
}
/**
* Total estimated lookup footprint. Obtained by calling {@link LookupExtractor#estimateHeapFootprint()} on
* all available lookups.
*/
private static long computeTotalLookupFootprint(final Injector injector)
{
// Subtract memory taken up by lookups. Correctness of this operation depends on lookups being loaded *before*
// we create this instance. Luckily, this is the typical mode of operation, since by default
// druid.lookup.enableLookupSyncOnStartup = true.
final LookupReferencesManager lookupManager = injector.getInstance(LookupReferencesManager.class);
int lookupCount = 0;
long lookupFootprint = 0;
for (final String lookupName : lookupManager.getAllLookupNames()) {
final LookupExtractorFactoryContainer container = lookupManager.get(lookupName).orElse(null);
if (container != null) {
try {
final LookupExtractor extractor = container.getLookupExtractorFactory().get();
lookupFootprint += extractor.estimateHeapFootprint();
lookupCount++;
}
catch (Exception e) {
log.noStackTrace().warn(e, "Failed to load lookup [%s] for size estimation. Skipping.", lookupName);
}
}
}
log.debug("Lookup footprint: %d lookups with %,d total bytes.", lookupCount, lookupFootprint);
return lookupFootprint;
}
}

View File

@ -35,7 +35,6 @@ import java.util.Collections;
public class IndexerWorkerContextTest
{
private IndexerWorkerContext indexerWorkerContext = null;
@Before
@ -50,7 +49,8 @@ public class IndexerWorkerContextTest
injectorMock,
null,
null,
null
null,
Runtime.getRuntime().maxMemory()
);
}

View File

@ -155,7 +155,8 @@ public class MSQTestWorkerContext implements WorkerContext
injector,
indexIO,
null,
null
null,
Runtime.getRuntime().maxMemory()
),
indexIO,
injector.getInstance(DataSegmentProvider.class),

View File

@ -41,6 +41,9 @@ import java.util.stream.Collectors;
@JsonTypeName("map")
public class MapLookupExtractor extends LookupExtractor
{
// Each String object has ~40 bytes of overhead, and x 2 for key and value strings
private static final long HEAP_ENTRY_OVERHEAD = 80;
private final Map<String, String> map;
private final boolean isOneToOne;
@ -55,6 +58,37 @@ public class MapLookupExtractor extends LookupExtractor
this.isOneToOne = isOneToOne;
}
/**
* Estimate the heap footprint of a Map.
*
* Important note: the implementation accepts any kind of Map, but estimates zero footprint for keys and values of
* types other than String.
*/
public static <K, V> long estimateHeapFootprint(@Nullable final Map<K, V> map)
{
if (map == null) {
return 0;
}
final int numEntries = map.size();
long numChars = 0;
for (Map.Entry<K, V> sEntry : map.entrySet()) {
final K key = sEntry.getKey();
final V value = sEntry.getValue();
if (key instanceof String) {
numChars += ((String) key).length();
}
if (value instanceof String) {
numChars += ((String) value).length();
}
}
return HEAP_ENTRY_OVERHEAD * numEntries + numChars * Character.BYTES;
}
@JsonProperty
public Map<String, String> getMap()
{
@ -147,6 +181,12 @@ public class MapLookupExtractor extends LookupExtractor
return Collections.unmodifiableSet(map.keySet());
}
@Override
public long estimateHeapFootprint()
{
return estimateHeapFootprint(map);
}
@Override
public boolean equals(Object o)
{

View File

@ -137,4 +137,16 @@ public abstract class LookupExtractor
{
return false;
}
/**
* Estimated heap footprint of this object. Not guaranteed to be accurate. For example, some implementations return
* zero even though they do use on-heap structures. However, the most common class, {@link MapLookupExtractor},
* does have a reasonable implementation.
*
* This API is provided for best-effort memory management and monitoring.
*/
public long estimateHeapFootprint()
{
return 0;
}
}

View File

@ -29,6 +29,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -105,6 +106,39 @@ public class MapLookupExtractorTest
);
}
@Test
public void testEstimateHeapFootprint()
{
Assert.assertEquals(0L, new MapLookupExtractor(Collections.emptyMap(), false).estimateHeapFootprint());
Assert.assertEquals(388L, new MapLookupExtractor(ImmutableMap.copyOf(lookupMap), false).estimateHeapFootprint());
}
@Test
public void testEstimateHeapFootprintStatic()
{
Assert.assertEquals(0L, MapLookupExtractor.estimateHeapFootprint(null));
Assert.assertEquals(0L, MapLookupExtractor.estimateHeapFootprint(Collections.emptyMap()));
Assert.assertEquals(388L, MapLookupExtractor.estimateHeapFootprint(ImmutableMap.copyOf(lookupMap)));
}
@Test
public void testEstimateHeapFootprintStaticNullKeysAndValues()
{
final Map<String, String> mapWithNullKeysAndNullValues = new HashMap<>();
mapWithNullKeysAndNullValues.put("foo", "bar");
mapWithNullKeysAndNullValues.put("foo2", null);
Assert.assertEquals(180L, MapLookupExtractor.estimateHeapFootprint(mapWithNullKeysAndNullValues));
}
@Test
public void testEstimateHeapFootprintStaticNonStringKeysAndValues()
{
final Map<Long, Object> mapWithNonStringKeysAndValues = new HashMap<>();
mapWithNonStringKeysAndValues.put(3L, 1);
mapWithNonStringKeysAndValues.put(4L, 3.2);
Assert.assertEquals(160L, MapLookupExtractor.estimateHeapFootprint(mapWithNonStringKeysAndValues));
}
@Test
public void testEquals()
{