diff --git a/docs/multi-stage-query/concepts.md b/docs/multi-stage-query/concepts.md index f08d733d7ae..0e8dd52312f 100644 --- a/docs/multi-stage-query/concepts.md +++ b/docs/multi-stage-query/concepts.md @@ -253,8 +253,9 @@ Increasing the amount of available memory can improve performance in certain cas Worker tasks use both JVM heap memory and off-heap ("direct") memory. -On Peons launched by Middle Managers, the bulk of the JVM heap (75%) is split up into two bundles of equal size: one -processor bundle and one worker bundle. Each one comprises 37.5% of the available JVM heap. +On Peons launched by Middle Managers, the bulk of the JVM heap (75%, less any space used by +[lookups](../querying/lookups.md)) is split up into two bundles of equal size: one processor bundle and one worker +bundle. Each one comprises 37.5% of the available JVM heap, less any space used by [lookups](../querying/lookups.md). Depending on the type of query, each worker and controller task can use a sketch for generating partition boundaries. Each sketch uses at most approximately 300 MB. diff --git a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java index fe013d5e73a..1a1a1306b3a 100644 --- a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java +++ b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.extraction.MapLookupExtractor; import org.apache.druid.server.lookup.namespace.NamespaceExtractionConfig; import java.lang.ref.WeakReference; @@ -132,26 +133,17 @@ public class OnHeapNamespaceExtractionCacheManager extends NamespaceExtractionCa void monitor(ServiceEmitter serviceEmitter) { long numEntries = 0; - long size = 0; + long heapSizeInBytes = 0; expungeCollectedCaches(); for (WeakReference> cacheRef : caches) { final Map cache = cacheRef.get(); - if (cache == null) { - continue; - } - numEntries += cache.size(); - for (Map.Entry sEntry : cache.entrySet()) { - final String key = sEntry.getKey(); - final String value = sEntry.getValue(); - if (key == null || value == null) { - LOG.debug("Missing entries for cache key"); - continue; - } - size += key.length() + value.length(); + + if (cache != null) { + numEntries += cache.size(); + heapSizeInBytes += MapLookupExtractor.estimateHeapFootprint(cache); } } - // Each String object has ~40 bytes of overhead, and x 2 for key and value strings - long heapSizeInBytes = (80 * numEntries) + size * Character.BYTES; + serviceEmitter.emit(ServiceMetricEvent.builder().build("namespace/cache/numEntries", numEntries)); serviceEmitter.emit(ServiceMetricEvent.builder().build("namespace/cache/heapSizeInBytes", heapSizeInBytes)); } diff --git a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java index 0cfa0768577..a59891254c0 100644 --- a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java +++ b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java @@ -215,6 +215,26 @@ public class PollingLookup extends LookupExtractor }; } + @Override + public long estimateHeapFootprint() + { + PollingCache cache = null; + + while (cache == null) { + final CacheRefKeeper cacheRefKeeper = refOfCacheKeeper.get(); + + if (cacheRefKeeper == null) { + // Closed. + return 0; + } + + // If null, we'll do another run through the while loop. + cache = cacheRefKeeper.getAndIncrementRef(); + } + + return cache.estimateHeapFootprint(); + } + @Override public int hashCode() { diff --git a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/cache/polling/OnHeapPollingCache.java b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/cache/polling/OnHeapPollingCache.java index daa697c6376..8f93cf39ff6 100644 --- a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/cache/polling/OnHeapPollingCache.java +++ b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/cache/polling/OnHeapPollingCache.java @@ -22,6 +22,7 @@ package org.apache.druid.server.lookup.cache.polling; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; +import org.apache.druid.query.extraction.MapLookupExtractor; import java.util.Collections; import java.util.List; @@ -83,6 +84,19 @@ public class OnHeapPollingCache implements PollingCache return listOfKeys; } + @Override + @SuppressWarnings("unchecked") + public long estimateHeapFootprint() + { + for (final Map.Entry entry : immutableMap.entrySet()) { + if (!(entry.getKey() instanceof String) || !(entry.getValue() instanceof String)) { + return 0; + } + } + + return MapLookupExtractor.estimateHeapFootprint((Map) immutableMap); + } + @Override public void close() { diff --git a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/cache/polling/PollingCache.java b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/cache/polling/PollingCache.java index 01c91d08f8b..befdfd1ffba 100644 --- a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/cache/polling/PollingCache.java +++ b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/cache/polling/PollingCache.java @@ -41,4 +41,12 @@ public interface PollingCache * close and clean the resources used by the cache */ void close(); + + /** + * Estimated heap footprint of this object. + */ + default long estimateHeapFootprint() + { + return 0; + } } diff --git a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/PollingLookupTest.java b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/PollingLookupTest.java index 2aabe93ce22..209b86dcde2 100644 --- a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/PollingLookupTest.java +++ b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/PollingLookupTest.java @@ -223,6 +223,15 @@ public class PollingLookupTest extends InitializedNullHandlingTest pollingLookup.keySet(); } + @Test + public void testEstimateHeapFootprint() + { + Assert.assertEquals( + pollingCacheFactory instanceof OffHeapPollingCache.OffHeapPollingCacheProvider ? 0L : 402L, + pollingLookup.estimateHeapFootprint() + ); + } + private void assertMapLookup(Map map, LookupExtractor lookup) { for (Map.Entry entry : map.entrySet()) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index 0536785562a..36479142e08 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -44,6 +44,9 @@ import org.apache.druid.msq.input.InputSpecs; import org.apache.druid.msq.kernel.FrameContext; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.rpc.CoordinatorServiceClient; +import org.apache.druid.query.lookup.LookupExtractor; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainer; +import org.apache.druid.query.lookup.LookupReferencesManager; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.rpc.ServiceLocations; import org.apache.druid.rpc.ServiceLocator; @@ -71,6 +74,7 @@ public class IndexerWorkerContext implements WorkerContext private final IndexIO indexIO; private final TaskDataSegmentProvider dataSegmentProvider; private final ServiceClientFactory clientFactory; + private final long availableHeapMemory; @GuardedBy("this") private OverlordClient overlordClient; @@ -83,7 +87,8 @@ public class IndexerWorkerContext implements WorkerContext final Injector injector, final IndexIO indexIO, final TaskDataSegmentProvider dataSegmentProvider, - final ServiceClientFactory clientFactory + final ServiceClientFactory clientFactory, + final long availableHeapMemory ) { this.toolbox = toolbox; @@ -91,6 +96,7 @@ public class IndexerWorkerContext implements WorkerContext this.indexIO = indexIO; this.dataSegmentProvider = dataSegmentProvider; this.clientFactory = clientFactory; + this.availableHeapMemory = availableHeapMemory; } public static IndexerWorkerContext createProductionInstance(final TaskToolbox toolbox, final Injector injector) @@ -109,7 +115,8 @@ public class IndexerWorkerContext implements WorkerContext injector, indexIO, new TaskDataSegmentProvider(coordinatorServiceClient, segmentCacheManager, indexIO), - serviceClientFactory + serviceClientFactory, + computeAvailableHeapMemory(injector) ); } @@ -227,17 +234,6 @@ public class IndexerWorkerContext implements WorkerContext @Override public FrameContext frameContext(QueryDefinition queryDef, int stageNumber) { - final int numWorkersInJvm; - - // Determine the max number of workers in JVM for memory allocations. - if (toolbox.getAppenderatorsManager() instanceof UnifiedIndexerAppenderatorsManager) { - // CliIndexer - numWorkersInJvm = injector.getInstance(WorkerConfig.class).getCapacity(); - } else { - // CliPeon - numWorkersInJvm = 1; - } - final IntSet inputStageNumbers = InputSpecs.getStageNumbers(queryDef.getStageDefinition(stageNumber).getInputSpecs()); final int numInputWorkers = @@ -250,8 +246,8 @@ public class IndexerWorkerContext implements WorkerContext indexIO, dataSegmentProvider, WorkerMemoryParameters.compute( - Runtime.getRuntime().maxMemory(), - numWorkersInJvm, + availableHeapMemory, + computeNumWorkersInJvm(), processorBouncer().getMaxCount(), numInputWorkers ) @@ -276,6 +272,20 @@ public class IndexerWorkerContext implements WorkerContext return injector.getInstance(Bouncer.class); } + /** + * Number of workers that may run in the current JVM, including the current worker. + */ + private int computeNumWorkersInJvm() + { + if (toolbox.getAppenderatorsManager() instanceof UnifiedIndexerAppenderatorsManager) { + // CliIndexer + return injector.getInstance(WorkerConfig.class).getCapacity(); + } else { + // CliPeon + return 1; + } + } + private synchronized OverlordClient makeOverlordClient() { if (overlordClient == null) { @@ -293,4 +303,46 @@ public class IndexerWorkerContext implements WorkerContext return controllerLocator; } + + /** + * Amount of memory available for our usage. + */ + private static long computeAvailableHeapMemory(final Injector injector) + { + return Runtime.getRuntime().maxMemory() - computeTotalLookupFootprint(injector); + } + + /** + * Total estimated lookup footprint. Obtained by calling {@link LookupExtractor#estimateHeapFootprint()} on + * all available lookups. + */ + private static long computeTotalLookupFootprint(final Injector injector) + { + // Subtract memory taken up by lookups. Correctness of this operation depends on lookups being loaded *before* + // we create this instance. Luckily, this is the typical mode of operation, since by default + // druid.lookup.enableLookupSyncOnStartup = true. + final LookupReferencesManager lookupManager = injector.getInstance(LookupReferencesManager.class); + + int lookupCount = 0; + long lookupFootprint = 0; + + for (final String lookupName : lookupManager.getAllLookupNames()) { + final LookupExtractorFactoryContainer container = lookupManager.get(lookupName).orElse(null); + + if (container != null) { + try { + final LookupExtractor extractor = container.getLookupExtractorFactory().get(); + lookupFootprint += extractor.estimateHeapFootprint(); + lookupCount++; + } + catch (Exception e) { + log.noStackTrace().warn(e, "Failed to load lookup [%s] for size estimation. Skipping.", lookupName); + } + } + } + + log.debug("Lookup footprint: %d lookups with %,d total bytes.", lookupCount, lookupFootprint); + + return lookupFootprint; + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java index 257d9b6aafe..fa2bdb2b7ee 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java @@ -35,7 +35,6 @@ import java.util.Collections; public class IndexerWorkerContextTest { - private IndexerWorkerContext indexerWorkerContext = null; @Before @@ -50,7 +49,8 @@ public class IndexerWorkerContextTest injectorMock, null, null, - null + null, + Runtime.getRuntime().maxMemory() ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java index 655077008db..3cf03b9f228 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java @@ -155,7 +155,8 @@ public class MSQTestWorkerContext implements WorkerContext injector, indexIO, null, - null + null, + Runtime.getRuntime().maxMemory() ), indexIO, injector.getInstance(DataSegmentProvider.class), diff --git a/processing/src/main/java/org/apache/druid/query/extraction/MapLookupExtractor.java b/processing/src/main/java/org/apache/druid/query/extraction/MapLookupExtractor.java index b00161566c8..74d61ceba84 100644 --- a/processing/src/main/java/org/apache/druid/query/extraction/MapLookupExtractor.java +++ b/processing/src/main/java/org/apache/druid/query/extraction/MapLookupExtractor.java @@ -41,6 +41,9 @@ import java.util.stream.Collectors; @JsonTypeName("map") public class MapLookupExtractor extends LookupExtractor { + // Each String object has ~40 bytes of overhead, and x 2 for key and value strings + private static final long HEAP_ENTRY_OVERHEAD = 80; + private final Map map; private final boolean isOneToOne; @@ -55,6 +58,37 @@ public class MapLookupExtractor extends LookupExtractor this.isOneToOne = isOneToOne; } + /** + * Estimate the heap footprint of a Map. + * + * Important note: the implementation accepts any kind of Map, but estimates zero footprint for keys and values of + * types other than String. + */ + public static long estimateHeapFootprint(@Nullable final Map map) + { + if (map == null) { + return 0; + } + + final int numEntries = map.size(); + long numChars = 0; + + for (Map.Entry sEntry : map.entrySet()) { + final K key = sEntry.getKey(); + final V value = sEntry.getValue(); + + if (key instanceof String) { + numChars += ((String) key).length(); + } + + if (value instanceof String) { + numChars += ((String) value).length(); + } + } + + return HEAP_ENTRY_OVERHEAD * numEntries + numChars * Character.BYTES; + } + @JsonProperty public Map getMap() { @@ -147,6 +181,12 @@ public class MapLookupExtractor extends LookupExtractor return Collections.unmodifiableSet(map.keySet()); } + @Override + public long estimateHeapFootprint() + { + return estimateHeapFootprint(map); + } + @Override public boolean equals(Object o) { diff --git a/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractor.java b/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractor.java index f24a965b040..728ecb4de17 100644 --- a/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractor.java +++ b/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractor.java @@ -137,4 +137,16 @@ public abstract class LookupExtractor { return false; } + + /** + * Estimated heap footprint of this object. Not guaranteed to be accurate. For example, some implementations return + * zero even though they do use on-heap structures. However, the most common class, {@link MapLookupExtractor}, + * does have a reasonable implementation. + * + * This API is provided for best-effort memory management and monitoring. + */ + public long estimateHeapFootprint() + { + return 0; + } } diff --git a/processing/src/test/java/org/apache/druid/query/extraction/MapLookupExtractorTest.java b/processing/src/test/java/org/apache/druid/query/extraction/MapLookupExtractorTest.java index ab180646e58..880618b27ef 100644 --- a/processing/src/test/java/org/apache/druid/query/extraction/MapLookupExtractorTest.java +++ b/processing/src/test/java/org/apache/druid/query/extraction/MapLookupExtractorTest.java @@ -29,6 +29,7 @@ import org.junit.Test; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -105,6 +106,39 @@ public class MapLookupExtractorTest ); } + @Test + public void testEstimateHeapFootprint() + { + Assert.assertEquals(0L, new MapLookupExtractor(Collections.emptyMap(), false).estimateHeapFootprint()); + Assert.assertEquals(388L, new MapLookupExtractor(ImmutableMap.copyOf(lookupMap), false).estimateHeapFootprint()); + } + + @Test + public void testEstimateHeapFootprintStatic() + { + Assert.assertEquals(0L, MapLookupExtractor.estimateHeapFootprint(null)); + Assert.assertEquals(0L, MapLookupExtractor.estimateHeapFootprint(Collections.emptyMap())); + Assert.assertEquals(388L, MapLookupExtractor.estimateHeapFootprint(ImmutableMap.copyOf(lookupMap))); + } + + @Test + public void testEstimateHeapFootprintStaticNullKeysAndValues() + { + final Map mapWithNullKeysAndNullValues = new HashMap<>(); + mapWithNullKeysAndNullValues.put("foo", "bar"); + mapWithNullKeysAndNullValues.put("foo2", null); + Assert.assertEquals(180L, MapLookupExtractor.estimateHeapFootprint(mapWithNullKeysAndNullValues)); + } + + @Test + public void testEstimateHeapFootprintStaticNonStringKeysAndValues() + { + final Map mapWithNonStringKeysAndValues = new HashMap<>(); + mapWithNonStringKeysAndValues.put(3L, 1); + mapWithNonStringKeysAndValues.put(4L, 3.2); + Assert.assertEquals(160L, MapLookupExtractor.estimateHeapFootprint(mapWithNonStringKeysAndValues)); + } + @Test public void testEquals() {