diff --git a/docs/configuration/index.md b/docs/configuration/index.md index b8d4d6a4a50..287f6872d3b 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -395,6 +395,7 @@ Metric monitoring is an essential part of Druid operations. The following monit |`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical processes. Available only on Historical processes.| |`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical processes. Available only on Historical processes. Not to be used when lazy loading is configured. | |`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.| +|`org.apache.druid.server.metrics.SubqueryCountStatsMonitor`|Reports how many subqueries have been materialized as rows or bytes and various other statistics related to the subquery execution| |`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: https://github.com/apache/druid/pull/4973.| |`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.| |`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics about task slot usage per emission period.| diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index e4a144e4822..38e68e81c76 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -72,6 +72,13 @@ Metrics may have additional dimensions beyond those listed above. |`metadatacache/refresh/time`|Time taken to refresh segments in broker segment metadata cache.|`dataSource`| |`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise| |`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.| +|`subquery/rowLimit/count`|Number of subqueries whose results are materialized as rows (Java objects on heap).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | +|`subquery/byteLimit/count`|Number of subqueries whose results are materialized as frames (Druid's internal byte representation of rows).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | +|`subquery/fallback/count`|Number of subqueries which cannot be materialized as frames|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | +|`subquery/fallback/insufficientType/count`|Number of subqueries which cannot be materialized as frames due to insufficient type information in the row signature.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | +|`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be materialized as frames due other reasons.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | +|`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given row limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | +|`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given byte limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | ### Historical diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 223048200f5..f5fda5f4e48 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -74,6 +74,7 @@ import org.apache.druid.server.ClientQuerySegmentWalker; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.TimelineLookup; import org.hamcrest.core.IsInstanceOf; @@ -310,6 +311,15 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest Assert.assertNotNull(expectedResults); Assert.assertThat(expectedResults, IsInstanceOf.instanceOf(List.class)); + DruidHttpClientConfig httpClientConfig = new DruidHttpClientConfig() + { + @Override + public long getMaxQueuedBytes() + { + return 0L; + } + }; + CachingClusteredClient baseClient = new CachingClusteredClient( warehouse, new TimelineServerView() @@ -354,14 +364,7 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest jsonMapper, new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1), new CacheConfig(), - new DruidHttpClientConfig() - { - @Override - public long getMaxQueuedBytes() - { - return 0L; - } - }, + httpClientConfig, new BrokerParallelMergeConfig(), ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, @@ -384,7 +387,10 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest jsonMapper, serverConfig, null, - new CacheConfig() + new CacheConfig(), + null, + httpClientConfig, + new SubqueryCountStatsProvider() ); defineMocks(); diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java b/processing/src/main/java/org/apache/druid/query/QueryContext.java index 403cef1fa4f..c247ab0e639 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContext.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java @@ -347,9 +347,14 @@ public class QueryContext return getInt(QueryContexts.MAX_SUBQUERY_ROWS_KEY, defaultSize); } - public long getMaxSubqueryMemoryBytes(long defaultMemoryBytes) + public String getMaxSubqueryMemoryBytes(String defaultMemoryBytes) { - return getLong(QueryContexts.MAX_SUBQUERY_BYTES_KEY, defaultMemoryBytes); + // Generic to allow for both strings and numbers to be passed as values in the query context + Object maxSubqueryBytesObject = get(QueryContexts.MAX_SUBQUERY_BYTES_KEY); + if (maxSubqueryBytesObject == null) { + maxSubqueryBytesObject = defaultMemoryBytes; + } + return String.valueOf(maxSubqueryBytesObject); } public boolean isUseNestedForUnknownTypeInSubquery(boolean defaultUseNestedForUnkownTypeInSubquery) diff --git a/processing/src/test/java/org/apache/druid/query/QueryContextTest.java b/processing/src/test/java/org/apache/druid/query/QueryContextTest.java index ebdbded3a72..54acab0a3f8 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryContextTest.java @@ -325,6 +325,23 @@ public class QueryContextTest assertThrows(BadQueryContextException.class, () -> context.getHumanReadableBytes("m6", HumanReadableBytes.ZERO)); } + @Test + public void testGetMaxSubqueryBytes() + { + final QueryContext context1 = new QueryContext( + ImmutableMap.of(QueryContexts.MAX_SUBQUERY_BYTES_KEY, 500_000_000) + ); + assertEquals("500000000", context1.getMaxSubqueryMemoryBytes(null)); + + final QueryContext context2 = new QueryContext( + ImmutableMap.of(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "auto") + ); + assertEquals("auto", context2.getMaxSubqueryMemoryBytes(null)); + + final QueryContext context3 = new QueryContext(ImmutableMap.of()); + assertEquals("unlimited", context3.getMaxSubqueryMemoryBytes("unlimited")); + } + @Test public void testDefaultEnableQueryDebugging() { diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java index 792c52f0032..3cdaec0d4a3 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java @@ -53,6 +53,7 @@ import java.io.File; import java.io.IOException; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -310,6 +311,9 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP @Override public Set getAllLookupNames() { + if (stateRef.get() == null) { + return Collections.emptySet(); + } return stateRef.get().lookupMap.keySet(); } diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 4054a7e6486..9f21fe8e687 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -28,8 +28,11 @@ import org.apache.druid.client.CachingClusteredClient; import org.apache.druid.client.DirectDruidClient; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory; -import org.apache.druid.java.util.common.IAE; +import org.apache.druid.frame.write.UnsupportedColumnTypeException; +import org.apache.druid.guice.annotations.Client; +import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequence; @@ -57,10 +60,12 @@ import org.apache.druid.query.RetryQueryRunnerConfig; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -103,6 +108,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private final ServerConfig serverConfig; private final Cache cache; private final CacheConfig cacheConfig; + private final SubqueryGuardrailHelper subqueryGuardrailHelper; + private final SubqueryCountStatsProvider subqueryStatsProvider; public ClientQuerySegmentWalker( ServiceEmitter emitter, @@ -114,7 +121,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker ObjectMapper objectMapper, ServerConfig serverConfig, Cache cache, - CacheConfig cacheConfig + CacheConfig cacheConfig, + LookupExtractorFactoryContainerProvider lookupManager, + DruidHttpClientConfig httpClientConfig, + SubqueryCountStatsProvider subqueryStatsProvider ) { this.emitter = emitter; @@ -127,6 +137,12 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker this.serverConfig = serverConfig; this.cache = cache; this.cacheConfig = cacheConfig; + this.subqueryGuardrailHelper = new SubqueryGuardrailHelper( + lookupManager, + Runtime.getRuntime().maxMemory(), + httpClientConfig.getNumConnections() + ); + this.subqueryStatsProvider = subqueryStatsProvider; } @Inject @@ -140,7 +156,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker ObjectMapper objectMapper, ServerConfig serverConfig, Cache cache, - CacheConfig cacheConfig + CacheConfig cacheConfig, + LookupExtractorFactoryContainerProvider lookupManager, + @Client DruidHttpClientConfig httpClientConfig, + SubqueryCountStatsProvider subqueryStatsProvider ) { this( @@ -153,7 +172,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker objectMapper, serverConfig, cache, - cacheConfig + cacheConfig, + lookupManager, + httpClientConfig, + subqueryStatsProvider ); } @@ -175,10 +197,13 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final DataSource freeTradeDataSource = globalizeIfPossible(newQuery.getDataSource()); // do an inlining dry run to see if any inlining is necessary, without actually running the queries. final int maxSubqueryRows = query.context().getMaxSubqueryRows(serverConfig.getMaxSubqueryRows()); - final long maxSubqueryMemory = query.context().getMaxSubqueryMemoryBytes(serverConfig.getMaxSubqueryBytes()); + final String maxSubqueryMemoryString = query.context() + .getMaxSubqueryMemoryBytes(serverConfig.getMaxSubqueryBytes()); + final long maxSubqueryMemory = subqueryGuardrailHelper.convertSubqueryLimitStringToLong(maxSubqueryMemoryString); final boolean useNestedForUnknownTypeInSubquery = query.context() .isUseNestedForUnknownTypeInSubquery(serverConfig.isuseNestedForUnknownTypeInSubquery()); + final DataSource inlineDryRun = inlineIfNecessary( freeTradeDataSource, toolChest, @@ -426,7 +451,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker cannotMaterializeToFrames, maxSubqueryRows, maxSubqueryMemory, - useNestedForUnknownTypeInSubquery + useNestedForUnknownTypeInSubquery, + subqueryStatsProvider ); } else { // Cannot inline subquery. Attempt to inline one level deeper, and then try again. @@ -641,7 +667,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final AtomicBoolean cannotMaterializeToFrames, final int limit, long memoryLimit, - boolean useNestedForUnknownTypeInSubquery + boolean useNestedForUnknownTypeInSubquery, + SubqueryCountStatsProvider subqueryStatsProvider ) { final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit; @@ -651,21 +678,25 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker switch (ClientQuerySegmentWalkerUtils.getLimitType(memoryLimit, cannotMaterializeToFrames.get())) { case ROW_LIMIT: if (limitAccumulator.get() >= rowLimitToUse) { + subqueryStatsProvider.incrementQueriesExceedingRowLimit(); throw ResourceLimitExceededException.withMessage( "Cannot issue the query, subqueries generated results beyond maximum[%d] rows", rowLimitToUse ); } + subqueryStatsProvider.incrementSubqueriesWithRowLimit(); dataSource = materializeResultsAsArray( query, results, toolChest, limitAccumulator, - limit + limit, + subqueryStatsProvider ); break; case MEMORY_LIMIT: if (memoryLimitAccumulator.get() >= memoryLimit) { + subqueryStatsProvider.incrementQueriesExceedingByteLimit(); throw ResourceLimitExceededException.withMessage( "Cannot issue the query, subqueries generated results beyond maximum[%d] bytes", memoryLimit @@ -678,30 +709,36 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker limitAccumulator, memoryLimitAccumulator, memoryLimit, - useNestedForUnknownTypeInSubquery + useNestedForUnknownTypeInSubquery, + subqueryStatsProvider ); if (!maybeDataSource.isPresent()) { cannotMaterializeToFrames.set(true); // Check if the previous row limit accumulator has exceeded the memory results - if (memoryLimitAccumulator.get() >= memoryLimit) { + if (limitAccumulator.get() >= rowLimitToUse) { + subqueryStatsProvider.incrementQueriesExceedingRowLimit(); throw ResourceLimitExceededException.withMessage( - "Cannot issue the query, subqueries generated results beyond maximum[%d] bytes", - memoryLimit + "Cannot issue the query, subqueries generated results beyond maximum[%d] rows", + rowLimitToUse ); } + subqueryStatsProvider.incrementSubqueriesWithRowLimit(); + subqueryStatsProvider.incrementSubqueriesFallingBackToRowLimit(); dataSource = materializeResultsAsArray( query, results, toolChest, limitAccumulator, - limit + limit, + subqueryStatsProvider ); } else { + subqueryStatsProvider.incrementSubqueriesWithByteLimit(); dataSource = maybeDataSource.get(); } break; default: - throw new IAE("Only row based and memory based limiting is supported"); + throw DruidException.defensive("Only row based and memory based limiting is supported"); } return dataSource; } @@ -717,7 +754,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, long memoryLimit, - boolean useNestedForUnknownTypeInSubquery + boolean useNestedForUnknownTypeInSubquery, + final SubqueryCountStatsProvider subqueryStatsProvider ) { Optional> framesOptional; @@ -730,7 +768,13 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker useNestedForUnknownTypeInSubquery ); } + catch (UnsupportedColumnTypeException e) { + subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnsufficientTypeInfo(); + log.debug(e, "Type info in signature insufficient to materialize rows as frames."); + return Optional.empty(); + } catch (Exception e) { + subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnknownReason(); log.debug(e, "Unable to materialize the results as frames due to an unhandleable exception " + "while conversion. Defaulting to materializing the results as rows"); return Optional.empty(); @@ -747,6 +791,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker frame -> { limitAccumulator.addAndGet(frame.getFrame().numRows()); if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) { + subqueryStatsProvider.incrementQueriesExceedingByteLimit(); throw ResourceLimitExceededException.withMessage( "Subquery generated results beyond maximum[%d] bytes", memoryLimit @@ -767,7 +812,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final Sequence results, final QueryToolChest toolChest, final AtomicInteger limitAccumulator, - final int limit + final int limit, + final SubqueryCountStatsProvider subqueryStatsProvider ) { final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit; @@ -779,6 +825,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker resultList, (acc, in) -> { if (limitAccumulator.getAndIncrement() >= rowLimitToUse) { + subqueryStatsProvider.incrementQueriesExceedingRowLimit(); throw ResourceLimitExceededException.withMessage( "Subquery generated results beyond maximum[%d] rows", rowLimitToUse diff --git a/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java b/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java new file mode 100644 index 00000000000..541f4474481 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.lookup.LookupExtractor; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainer; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +/** + * Aids the {@link ClientQuerySegmentWalker} compute the available heap size per query for materializing the inline + * results from the subqueries + */ +public class SubqueryGuardrailHelper +{ + private static final double SUBQUERY_MEMORY_BYTES_FRACTION = 0.5; + private static final Logger log = new Logger(SubqueryGuardrailHelper.class); + + + public static final String UNLIMITED_LIMIT_VALUE = "unlimited"; + public static final String AUTO_LIMIT_VALUE = "auto"; + + public static final Long UNLIMITED_LIMIT_REPRESENTATION = -1L; + + private final long autoLimitBytes; + + public SubqueryGuardrailHelper( + final LookupExtractorFactoryContainerProvider lookupManager, + final long maxMemoryInJvm, + final int brokerNumHttpConnections + ) + { + final DateTime start = DateTimes.nowUtc(); + autoLimitBytes = computeLimitBytesForAuto(lookupManager, maxMemoryInJvm, brokerNumHttpConnections); + final long startupTimeMs = new Interval(start, DateTimes.nowUtc()).toDurationMillis(); + + log.info("Took [%d] ms to initialize the SubqueryGuardrailHelper.", startupTimeMs); + + if (startupTimeMs >= 10_000) { + log.warn("Took more than 10 seconds to initialize the SubqueryGuardrailHelper. " + + "This happens when the lookup sizes are very large. " + + "Consider lowering the size of the lookups to reduce the initialization time." + ); + } + + log.info("Each query has a memory limit of [%d] bytes to materialize its subqueries' results if auto " + + "limit is used", autoLimitBytes); + } + + public long convertSubqueryLimitStringToLong(final String maxSubqueryLimit) + { + if (UNLIMITED_LIMIT_VALUE.equalsIgnoreCase(maxSubqueryLimit)) { + return UNLIMITED_LIMIT_REPRESENTATION; + } + if (AUTO_LIMIT_VALUE.equalsIgnoreCase(maxSubqueryLimit)) { + return autoLimitBytes; + } + + long retVal; + try { + retVal = Long.parseLong(maxSubqueryLimit); + } + catch (NumberFormatException e) { + throw InvalidInput.exception( + e, + "Unable to parse the provided maxSubqueryLimit [%s] to a valid number. Valid values for the " + + "maxSubqueryLimits can be 'auto', 'unlimited' or a positive number representing bytes to reserve.", + maxSubqueryLimit + ); + } + + // This can happen if the provided number is greater than Longs.MAX_VALUE + if (retVal < 0) { + throw InvalidInput.exception("Limit too large"); + } + + return retVal; + } + + /** + * Computes the byte limit when 'auto' is passed as a parameter. This computes the total heap space available + * for the subquery inlining by getting a fraction of the total heap space in JVM, removing the size of the lookups, + * and dividing it by the maximum concurrent queries that can run. Maximum concurrent queries that Druid can + * run is usually limited by its broker's http threadpool size + * + * Consider a JVM running locally with 4 GB heap size, 20 Broker threads and 100 MB space required by the lookups. + * Each query under 'auto' would then get 97.5 MB to materialize the results. Considering the default of 100,000 rows + * and each row consuming 300 - 700 bytes of heap space, the subqueries would approximately consume between 30 MB to + * 70 MB of data, which looks approximately equivalent to what we reserved with auto. This would wildly vary as we have + * larger rows, but that's where the "auto" factor of subquery bytes come into play, where we would estimate by size + * the number of rows that can be materialized based on the memory they consume. + */ + private static long computeLimitBytesForAuto( + final LookupExtractorFactoryContainerProvider lookupManager, + final long maxMemoryInJvm, + final int brokerNumHttpConnections + ) + { + long memoryInJvmWithoutLookups = maxMemoryInJvm - computeLookupFootprint(lookupManager); + long memoryInJvmForSubqueryResultsInlining = (long) (memoryInJvmWithoutLookups * SUBQUERY_MEMORY_BYTES_FRACTION); + long memoryInJvmForSubqueryResultsInliningPerQuery = memoryInJvmForSubqueryResultsInlining + / brokerNumHttpConnections; + return Math.max(memoryInJvmForSubqueryResultsInliningPerQuery, 1L); + } + + /** + * Computes the size occupied by the lookups. If the size of the lookup cannot be computed, it skips over the lookup + */ + private static long computeLookupFootprint(final LookupExtractorFactoryContainerProvider lookupManager) + { + + if (lookupManager == null || lookupManager.getAllLookupNames() == null) { + log.warn("Failed to get the lookupManager for estimating lookup size. Skipping."); + return 0; + } + + int lookupCount = 0; + long lookupFootprint = 0; + + for (final String lookupName : lookupManager.getAllLookupNames()) { + final LookupExtractorFactoryContainer container = lookupManager.get(lookupName).orElse(null); + + if (container != null) { + try { + final LookupExtractor extractor = container.getLookupExtractorFactory().get(); + lookupFootprint += extractor.estimateHeapFootprint(); + lookupCount++; + } + catch (Exception e) { + log.noStackTrace().warn(e, "Failed to load lookup [%s] for size estimation. Skipping.", lookupName); + } + } + } + + log.debug("Lookup footprint: [%d] lookups with [%,d] total bytes.", lookupCount, lookupFootprint); + + return lookupFootprint; + } +} diff --git a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java index 276a0030af4..04366d30dcd 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java @@ -44,7 +44,7 @@ import java.util.zip.Deflater; public class ServerConfig { public static final int DEFAULT_GZIP_INFLATE_BUFFER_SIZE = 4096; - public static final long DEFAULT_MAX_SUBQUERY_BYTES = -1L; + public static final String DEFAULT_MAX_SUBQUERY_BYTES = "unlimited"; private static final boolean DEFAULT_USE_NESTED_FOR_UNKNOWN_TYPE_IN_SUBQUERY = false; @@ -61,7 +61,7 @@ public class ServerConfig long defaultQueryTimeout, long maxScatterGatherBytes, int maxSubqueryRows, - long maxSubqueryBytes, + String maxSubqueryBytes, boolean useNestedForUnknownTypeInSubquery, long maxQueryTimeout, int maxRequestHeaderSize, @@ -140,7 +140,7 @@ public class ServerConfig private int maxSubqueryRows = 100000; @JsonProperty - private long maxSubqueryBytes = DEFAULT_MAX_SUBQUERY_BYTES; + private String maxSubqueryBytes = DEFAULT_MAX_SUBQUERY_BYTES; @JsonProperty private boolean useNestedForUnknownTypeInSubquery = DEFAULT_USE_NESTED_FOR_UNKNOWN_TYPE_IN_SUBQUERY; @@ -231,7 +231,7 @@ public class ServerConfig return maxSubqueryRows; } - public long getMaxSubqueryBytes() + public String getMaxSubqueryBytes() { return maxSubqueryBytes; } @@ -322,7 +322,7 @@ public class ServerConfig enableRequestLimit == that.enableRequestLimit && defaultQueryTimeout == that.defaultQueryTimeout && maxSubqueryRows == that.maxSubqueryRows && - maxSubqueryBytes == that.maxSubqueryBytes && + Objects.equals(maxSubqueryBytes, that.maxSubqueryBytes) && useNestedForUnknownTypeInSubquery == that.useNestedForUnknownTypeInSubquery && maxQueryTimeout == that.maxQueryTimeout && maxRequestHeaderSize == that.maxRequestHeaderSize && diff --git a/server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsMonitor.java new file mode 100644 index 00000000000..57da36ac154 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsMonitor.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.AbstractMonitor; +import org.apache.druid.java.util.metrics.KeyedDiff; + +import java.util.Map; + +/** + * Monitors and emits the metrics corresponding to the subqueries and their materialization. + */ +public class SubqueryCountStatsMonitor extends AbstractMonitor +{ + + private static final String KEY = "subqueryCountStats"; + + private static final String ROW_LIMIT_COUNT = "subquery/rowLimit/count"; + private static final String BYTE_LIMIT_COUNT = "subquery/byteLimit/count"; + private static final String FALLBACK_COUNT = "subquery/fallback/count"; + private static final String INSUFFICIENT_TYPE_COUNT = "subquery/fallback/insufficientType/count"; + private static final String UNKNOWN_REASON_COUNT = "subquery/fallback/unknownReason/count"; + private static final String ROW_LIMIT_EXCEEDED_COUNT = "query/rowLimit/exceeded/count"; + private static final String BYTE_LIMIT_EXCEEDED_COUNT = "query/byteLimit/exceeded/count"; + + private final KeyedDiff keyedDiff = new KeyedDiff(); + private final SubqueryCountStatsProvider statsProvider; + + @Inject + public SubqueryCountStatsMonitor(SubqueryCountStatsProvider statsProvider) + { + this.statsProvider = statsProvider; + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + final long subqueriesWithRowBasedLimit = statsProvider.subqueriesWithRowLimit(); + final long subqueriesWithByteBasedLimit = statsProvider.subqueriesWithByteLimit(); + final long subqueriesFallingBackToRowBasedLimit = statsProvider.subqueriesFallingBackToRowLimit(); + final long subqueriesFallingBackDueToUnsufficientTypeInfo = statsProvider.subqueriesFallingBackDueToUnsufficientTypeInfo(); + final long subqueriesFallingBackDueToUnknownReason = statsProvider.subqueriesFallingBackDueUnknownReason(); + final long queriesExceedingRowLimit = statsProvider.queriesExceedingRowLimit(); + final long queriesExceedingByteLimit = statsProvider.queriesExceedingByteLimit(); + + Map diff = keyedDiff.to( + KEY, + ImmutableMap.of( + ROW_LIMIT_COUNT, subqueriesWithRowBasedLimit, + BYTE_LIMIT_COUNT, subqueriesWithByteBasedLimit, + FALLBACK_COUNT, subqueriesFallingBackToRowBasedLimit, + INSUFFICIENT_TYPE_COUNT, subqueriesFallingBackDueToUnsufficientTypeInfo, + UNKNOWN_REASON_COUNT, subqueriesFallingBackDueToUnknownReason, + ROW_LIMIT_EXCEEDED_COUNT, queriesExceedingRowLimit, + BYTE_LIMIT_EXCEEDED_COUNT, queriesExceedingByteLimit + ) + ); + + if (diff != null) { + for (Map.Entry diffEntry : diff.entrySet()) { + emitter.emit(builder.setMetric(diffEntry.getKey(), diffEntry.getValue())); + } + } + + return true; + } +} diff --git a/server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsProvider.java new file mode 100644 index 00000000000..c7007765496 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsProvider.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Collects the metrics corresponding to the subqueries and their materialization. + */ +public class SubqueryCountStatsProvider +{ + + private final AtomicLong successfulSubqueriesWithRowLimit = new AtomicLong(); + private final AtomicLong successfulSubqueriesWithByteLimit = new AtomicLong(); + private final AtomicLong subqueriesFallingBackToRowLimit = new AtomicLong(); + private final AtomicLong subqueriesFallingBackDueToUnsufficientTypeInfo = new AtomicLong(); + private final AtomicLong subqueriesFallingBackDueToUnknownReason = new AtomicLong(); + private final AtomicLong queriesExceedingRowLimit = new AtomicLong(); + private final AtomicLong queriesExceedingByteLimit = new AtomicLong(); + + /** + * @return Count of subqueries where the results are materialized as rows i.e. {@code List} + */ + public long subqueriesWithRowLimit() + { + return successfulSubqueriesWithRowLimit.get(); + } + + /** + * @return Count of subqueries where the results are materialized as {@link org.apache.druid.frame.Frame} + */ + public long subqueriesWithByteLimit() + { + return successfulSubqueriesWithByteLimit.get(); + } + + /** + * @return Count of subqueries where the results are + */ + public long subqueriesFallingBackToRowLimit() + { + return subqueriesFallingBackToRowLimit.get(); + } + + /** + * @return Count of the subset of subqueries that are falling back due to insufficient type information in the + * {@link org.apache.druid.segment.column.RowSignature}. This is expected to be the most common and already known + * cause of fallback, therefore this is added as a separate metric + */ + public long subqueriesFallingBackDueToUnsufficientTypeInfo() + { + return subqueriesFallingBackDueToUnsufficientTypeInfo.get(); + } + + /** + * @return Count of the subset of subqueries that are falling back due to insufficient an unknown error. This can be due to a + * few known reasons like columnar frames not supporting the array types right now, or due to unknown errors while + * performing the materialization + */ + public long subqueriesFallingBackDueUnknownReason() + { + return subqueriesFallingBackDueToUnknownReason.get(); + } + + /** + * @return Number of queries that fail due to their subqueries exceeding the prescribed row limit + */ + public long queriesExceedingRowLimit() + { + return queriesExceedingRowLimit.get(); + } + + /** + * @return Number of subqueries that fail due to their subqueries exceeding the prescribed byte limit + */ + public long queriesExceedingByteLimit() + { + return queriesExceedingByteLimit.get(); + } + + + public void incrementSubqueriesWithRowLimit() + { + successfulSubqueriesWithRowLimit.incrementAndGet(); + } + + public void incrementSubqueriesWithByteLimit() + { + successfulSubqueriesWithByteLimit.incrementAndGet(); + } + + public void incrementSubqueriesFallingBackToRowLimit() + { + subqueriesFallingBackToRowLimit.incrementAndGet(); + } + + public void incrementSubqueriesFallingBackDueToUnsufficientTypeInfo() + { + subqueriesFallingBackDueToUnsufficientTypeInfo.incrementAndGet(); + } + + public void incrementSubqueriesFallingBackDueToUnknownReason() + { + subqueriesFallingBackDueToUnknownReason.incrementAndGet(); + } + + public void incrementQueriesExceedingRowLimit() + { + queriesExceedingRowLimit.incrementAndGet(); + } + + public void incrementQueriesExceedingByteLimit() + { + queriesExceedingByteLimit.incrementAndGet(); + } +} diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 7180b03983f..fcdc894f61f 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -1439,7 +1439,8 @@ public class ClientQuerySegmentWalkerTest ), conglomerate, joinableFactory, - serverConfig + serverConfig, + null ); } diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 1c9f15acf79..63a751bbb89 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -22,6 +22,7 @@ package org.apache.druid.server; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.BrokerParallelMergeConfig; @@ -79,6 +80,7 @@ import org.apache.druid.segment.join.LookupJoinableFactory; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -116,7 +118,8 @@ public class QueryStackTests final QuerySegmentWalker localWalker, final QueryRunnerFactoryConglomerate conglomerate, final JoinableFactory joinableFactory, - final ServerConfig serverConfig + final ServerConfig serverConfig, + final LookupExtractorFactoryContainerProvider lookupManager ) { return new ClientQuerySegmentWalker( @@ -161,7 +164,17 @@ public class QueryStackTests { return false; } - } + }, + lookupManager, + new DruidHttpClientConfig() + { + @Override + public int getNumConnections() + { + return 1; + } + }, + new SubqueryCountStatsProvider() ); } diff --git a/server/src/test/java/org/apache/druid/server/SubqueryGuardrailHelperTest.java b/server/src/test/java/org/apache/druid/server/SubqueryGuardrailHelperTest.java new file mode 100644 index 00000000000..07cecb59680 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/SubqueryGuardrailHelperTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.query.lookup.LookupExtractor; +import org.apache.druid.query.lookup.LookupExtractorFactory; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainer; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Optional; + +/** + * Tests that the subquery limit utils gives out valid byte limits when supplied with 'auto'. + * Assuming that the average segment size is 500 MB to 1 GB, and optimal number of rows in the segment is 5 million, the + * typical bytes per row can be from 100 bytes to 200 bytes. We can keep this in mind while trying to look through the test + * values + */ +public class SubqueryGuardrailHelperTest +{ + + @Test + public void testConvertSubqueryLimitStringToLongWithoutLookups() + { + Assert.assertEquals( + 10737418L, + fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("512MiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 13421772L, + fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("640MiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 16106127L, + fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("768MiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 21474836L, + fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("1GiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 171798691L, + fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("8GiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 429496729L, + fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("20GiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + } + + @Test + public void testConvertSubqueryLimitStringToLongWithLookups() + { + Assert.assertEquals( + 10527703L, + fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("512MiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 13212057L, + fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("640MiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 15896412, + fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("768MiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 21265121L, + fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("1GiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 171588976L, + fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("8GiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 429287014L, + fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("20GiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + } + + + private SubqueryGuardrailHelper fetchSubqueryLimitUtilsForNoLookups(long maxMemoryInJvm, int brokerNumHttpConnections) + { + return new SubqueryGuardrailHelper(null, maxMemoryInJvm, brokerNumHttpConnections); + } + + private SubqueryGuardrailHelper fetchSubqueryLimitUtilsForLookups(long maxMemoryInJvm, int brokerNumHttpConnections) + { + return new SubqueryGuardrailHelper(lookupManager(), maxMemoryInJvm, brokerNumHttpConnections); + } + + + private static long humanReadableSizeToBytes(String humanReadableSize) + { + return new HumanReadableBytes(humanReadableSize).getBytes(); + } + + public static LookupExtractorFactoryContainerProvider lookupManager() + { + LookupExtractorFactoryContainerProvider lookupManager = EasyMock.mock(LookupExtractorFactoryContainerProvider.class); + + EasyMock.expect(lookupManager.getAllLookupNames()).andReturn(ImmutableSet.of("lookupFoo", "lookupBar")).anyTimes(); + + LookupExtractorFactoryContainer lookupFooContainer = EasyMock.mock(LookupExtractorFactoryContainer.class); + EasyMock.expect(lookupManager.get("lookupFoo")).andReturn(Optional.of(lookupFooContainer)); + LookupExtractorFactory lookupFooExtractorFactory = EasyMock.mock(LookupExtractorFactory.class); + EasyMock.expect(lookupFooContainer.getLookupExtractorFactory()).andReturn(lookupFooExtractorFactory); + LookupExtractor lookupFooExtractor = EasyMock.mock(LookupExtractor.class); + EasyMock.expect(lookupFooExtractorFactory.get()).andReturn(lookupFooExtractor); + EasyMock.expect(lookupFooExtractor.estimateHeapFootprint()).andReturn(humanReadableSizeToBytes("10MiB")); + + EasyMock.expect(lookupManager.get("lookupBar")).andReturn(Optional.empty()); + + EasyMock.replay(lookupManager, lookupFooContainer, lookupFooExtractor, lookupFooExtractorFactory); + + return lookupManager; + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index dd9c3c3c0b5..1de8a0f1d47 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -69,6 +69,7 @@ import org.apache.druid.server.http.SegmentListerResource; import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.QueryCountStatsProvider; +import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.apache.druid.server.router.TieredBrokerConfig; import org.apache.druid.sql.guice.SqlModule; import org.apache.druid.timeline.PruneLoadSpec; @@ -146,6 +147,7 @@ public class CliBroker extends ServerRunnable binder.bind(BrokerQueryResource.class).in(LazySingleton.class); Jerseys.addResource(binder, BrokerQueryResource.class); binder.bind(QueryCountStatsProvider.class).to(BrokerQueryResource.class).in(LazySingleton.class); + binder.bind(SubqueryCountStatsProvider.class).toInstance(new SubqueryCountStatsProvider()); Jerseys.addResource(binder, BrokerResource.class); Jerseys.addResource(binder, ClientInfoResource.class); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index b43a6515956..a47bbcf9577 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -116,7 +116,8 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C ), conglomerate, joinableFactoryWrapper.getJoinableFactory(), - new ServerConfig() + new ServerConfig(), + LOOKUP_EXTRACTOR_FACTORY_CONTAINER_PROVIDER ); }