From 6ee0b06e38d9512c9240f55344ed185d2f62a3a6 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 6 Sep 2023 05:47:19 +0000 Subject: [PATCH] Auto configuration for maxSubqueryBytes (#14808) A new monitor SubqueryCountStatsMonitor which emits the metrics corresponding to the subqueries and their execution is now introduced. Moreover, the user can now also use the auto mode to automatically set the number of bytes available per query for the inlining of its subquery's results. --- docs/configuration/index.md | 1 + docs/operations/metrics.md | 7 + .../movingaverage/MovingAverageQueryTest.java | 24 ++- .../org/apache/druid/query/QueryContext.java | 9 +- .../apache/druid/query/QueryContextTest.java | 17 ++ .../query/lookup/LookupReferencesManager.java | 4 + .../server/ClientQuerySegmentWalker.java | 79 +++++++-- .../druid/server/SubqueryGuardrailHelper.java | 160 ++++++++++++++++++ .../server/initialization/ServerConfig.java | 10 +- .../metrics/SubqueryCountStatsMonitor.java | 89 ++++++++++ .../metrics/SubqueryCountStatsProvider.java | 133 +++++++++++++++ .../server/ClientQuerySegmentWalkerTest.java | 3 +- .../apache/druid/server/QueryStackTests.java | 17 +- .../server/SubqueryGuardrailHelperTest.java | 160 ++++++++++++++++++ .../java/org/apache/druid/cli/CliBroker.java | 2 + .../SpecificSegmentsQuerySegmentWalker.java | 3 +- 16 files changed, 682 insertions(+), 36 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java create mode 100644 server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsMonitor.java create mode 100644 server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsProvider.java create mode 100644 server/src/test/java/org/apache/druid/server/SubqueryGuardrailHelperTest.java diff --git a/docs/configuration/index.md b/docs/configuration/index.md index b8d4d6a4a50..287f6872d3b 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -395,6 +395,7 @@ Metric monitoring is an essential part of Druid operations. The following monit |`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical processes. Available only on Historical processes.| |`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical processes. Available only on Historical processes. Not to be used when lazy loading is configured. | |`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.| +|`org.apache.druid.server.metrics.SubqueryCountStatsMonitor`|Reports how many subqueries have been materialized as rows or bytes and various other statistics related to the subquery execution| |`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: https://github.com/apache/druid/pull/4973.| |`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.| |`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics about task slot usage per emission period.| diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index e4a144e4822..38e68e81c76 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -72,6 +72,13 @@ Metrics may have additional dimensions beyond those listed above. |`metadatacache/refresh/time`|Time taken to refresh segments in broker segment metadata cache.|`dataSource`| |`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise| |`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.| +|`subquery/rowLimit/count`|Number of subqueries whose results are materialized as rows (Java objects on heap).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | +|`subquery/byteLimit/count`|Number of subqueries whose results are materialized as frames (Druid's internal byte representation of rows).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | +|`subquery/fallback/count`|Number of subqueries which cannot be materialized as frames|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | +|`subquery/fallback/insufficientType/count`|Number of subqueries which cannot be materialized as frames due to insufficient type information in the row signature.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | +|`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be materialized as frames due other reasons.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | +|`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given row limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | +|`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given byte limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | ### Historical diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 223048200f5..f5fda5f4e48 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -74,6 +74,7 @@ import org.apache.druid.server.ClientQuerySegmentWalker; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.TimelineLookup; import org.hamcrest.core.IsInstanceOf; @@ -310,6 +311,15 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest Assert.assertNotNull(expectedResults); Assert.assertThat(expectedResults, IsInstanceOf.instanceOf(List.class)); + DruidHttpClientConfig httpClientConfig = new DruidHttpClientConfig() + { + @Override + public long getMaxQueuedBytes() + { + return 0L; + } + }; + CachingClusteredClient baseClient = new CachingClusteredClient( warehouse, new TimelineServerView() @@ -354,14 +364,7 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest jsonMapper, new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1), new CacheConfig(), - new DruidHttpClientConfig() - { - @Override - public long getMaxQueuedBytes() - { - return 0L; - } - }, + httpClientConfig, new BrokerParallelMergeConfig(), ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, @@ -384,7 +387,10 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest jsonMapper, serverConfig, null, - new CacheConfig() + new CacheConfig(), + null, + httpClientConfig, + new SubqueryCountStatsProvider() ); defineMocks(); diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java b/processing/src/main/java/org/apache/druid/query/QueryContext.java index 403cef1fa4f..c247ab0e639 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContext.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java @@ -347,9 +347,14 @@ public class QueryContext return getInt(QueryContexts.MAX_SUBQUERY_ROWS_KEY, defaultSize); } - public long getMaxSubqueryMemoryBytes(long defaultMemoryBytes) + public String getMaxSubqueryMemoryBytes(String defaultMemoryBytes) { - return getLong(QueryContexts.MAX_SUBQUERY_BYTES_KEY, defaultMemoryBytes); + // Generic to allow for both strings and numbers to be passed as values in the query context + Object maxSubqueryBytesObject = get(QueryContexts.MAX_SUBQUERY_BYTES_KEY); + if (maxSubqueryBytesObject == null) { + maxSubqueryBytesObject = defaultMemoryBytes; + } + return String.valueOf(maxSubqueryBytesObject); } public boolean isUseNestedForUnknownTypeInSubquery(boolean defaultUseNestedForUnkownTypeInSubquery) diff --git a/processing/src/test/java/org/apache/druid/query/QueryContextTest.java b/processing/src/test/java/org/apache/druid/query/QueryContextTest.java index ebdbded3a72..54acab0a3f8 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryContextTest.java @@ -325,6 +325,23 @@ public class QueryContextTest assertThrows(BadQueryContextException.class, () -> context.getHumanReadableBytes("m6", HumanReadableBytes.ZERO)); } + @Test + public void testGetMaxSubqueryBytes() + { + final QueryContext context1 = new QueryContext( + ImmutableMap.of(QueryContexts.MAX_SUBQUERY_BYTES_KEY, 500_000_000) + ); + assertEquals("500000000", context1.getMaxSubqueryMemoryBytes(null)); + + final QueryContext context2 = new QueryContext( + ImmutableMap.of(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "auto") + ); + assertEquals("auto", context2.getMaxSubqueryMemoryBytes(null)); + + final QueryContext context3 = new QueryContext(ImmutableMap.of()); + assertEquals("unlimited", context3.getMaxSubqueryMemoryBytes("unlimited")); + } + @Test public void testDefaultEnableQueryDebugging() { diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java index 792c52f0032..3cdaec0d4a3 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java @@ -53,6 +53,7 @@ import java.io.File; import java.io.IOException; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -310,6 +311,9 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP @Override public Set getAllLookupNames() { + if (stateRef.get() == null) { + return Collections.emptySet(); + } return stateRef.get().lookupMap.keySet(); } diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 4054a7e6486..9f21fe8e687 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -28,8 +28,11 @@ import org.apache.druid.client.CachingClusteredClient; import org.apache.druid.client.DirectDruidClient; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory; -import org.apache.druid.java.util.common.IAE; +import org.apache.druid.frame.write.UnsupportedColumnTypeException; +import org.apache.druid.guice.annotations.Client; +import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequence; @@ -57,10 +60,12 @@ import org.apache.druid.query.RetryQueryRunnerConfig; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -103,6 +108,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private final ServerConfig serverConfig; private final Cache cache; private final CacheConfig cacheConfig; + private final SubqueryGuardrailHelper subqueryGuardrailHelper; + private final SubqueryCountStatsProvider subqueryStatsProvider; public ClientQuerySegmentWalker( ServiceEmitter emitter, @@ -114,7 +121,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker ObjectMapper objectMapper, ServerConfig serverConfig, Cache cache, - CacheConfig cacheConfig + CacheConfig cacheConfig, + LookupExtractorFactoryContainerProvider lookupManager, + DruidHttpClientConfig httpClientConfig, + SubqueryCountStatsProvider subqueryStatsProvider ) { this.emitter = emitter; @@ -127,6 +137,12 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker this.serverConfig = serverConfig; this.cache = cache; this.cacheConfig = cacheConfig; + this.subqueryGuardrailHelper = new SubqueryGuardrailHelper( + lookupManager, + Runtime.getRuntime().maxMemory(), + httpClientConfig.getNumConnections() + ); + this.subqueryStatsProvider = subqueryStatsProvider; } @Inject @@ -140,7 +156,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker ObjectMapper objectMapper, ServerConfig serverConfig, Cache cache, - CacheConfig cacheConfig + CacheConfig cacheConfig, + LookupExtractorFactoryContainerProvider lookupManager, + @Client DruidHttpClientConfig httpClientConfig, + SubqueryCountStatsProvider subqueryStatsProvider ) { this( @@ -153,7 +172,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker objectMapper, serverConfig, cache, - cacheConfig + cacheConfig, + lookupManager, + httpClientConfig, + subqueryStatsProvider ); } @@ -175,10 +197,13 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final DataSource freeTradeDataSource = globalizeIfPossible(newQuery.getDataSource()); // do an inlining dry run to see if any inlining is necessary, without actually running the queries. final int maxSubqueryRows = query.context().getMaxSubqueryRows(serverConfig.getMaxSubqueryRows()); - final long maxSubqueryMemory = query.context().getMaxSubqueryMemoryBytes(serverConfig.getMaxSubqueryBytes()); + final String maxSubqueryMemoryString = query.context() + .getMaxSubqueryMemoryBytes(serverConfig.getMaxSubqueryBytes()); + final long maxSubqueryMemory = subqueryGuardrailHelper.convertSubqueryLimitStringToLong(maxSubqueryMemoryString); final boolean useNestedForUnknownTypeInSubquery = query.context() .isUseNestedForUnknownTypeInSubquery(serverConfig.isuseNestedForUnknownTypeInSubquery()); + final DataSource inlineDryRun = inlineIfNecessary( freeTradeDataSource, toolChest, @@ -426,7 +451,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker cannotMaterializeToFrames, maxSubqueryRows, maxSubqueryMemory, - useNestedForUnknownTypeInSubquery + useNestedForUnknownTypeInSubquery, + subqueryStatsProvider ); } else { // Cannot inline subquery. Attempt to inline one level deeper, and then try again. @@ -641,7 +667,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final AtomicBoolean cannotMaterializeToFrames, final int limit, long memoryLimit, - boolean useNestedForUnknownTypeInSubquery + boolean useNestedForUnknownTypeInSubquery, + SubqueryCountStatsProvider subqueryStatsProvider ) { final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit; @@ -651,21 +678,25 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker switch (ClientQuerySegmentWalkerUtils.getLimitType(memoryLimit, cannotMaterializeToFrames.get())) { case ROW_LIMIT: if (limitAccumulator.get() >= rowLimitToUse) { + subqueryStatsProvider.incrementQueriesExceedingRowLimit(); throw ResourceLimitExceededException.withMessage( "Cannot issue the query, subqueries generated results beyond maximum[%d] rows", rowLimitToUse ); } + subqueryStatsProvider.incrementSubqueriesWithRowLimit(); dataSource = materializeResultsAsArray( query, results, toolChest, limitAccumulator, - limit + limit, + subqueryStatsProvider ); break; case MEMORY_LIMIT: if (memoryLimitAccumulator.get() >= memoryLimit) { + subqueryStatsProvider.incrementQueriesExceedingByteLimit(); throw ResourceLimitExceededException.withMessage( "Cannot issue the query, subqueries generated results beyond maximum[%d] bytes", memoryLimit @@ -678,30 +709,36 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker limitAccumulator, memoryLimitAccumulator, memoryLimit, - useNestedForUnknownTypeInSubquery + useNestedForUnknownTypeInSubquery, + subqueryStatsProvider ); if (!maybeDataSource.isPresent()) { cannotMaterializeToFrames.set(true); // Check if the previous row limit accumulator has exceeded the memory results - if (memoryLimitAccumulator.get() >= memoryLimit) { + if (limitAccumulator.get() >= rowLimitToUse) { + subqueryStatsProvider.incrementQueriesExceedingRowLimit(); throw ResourceLimitExceededException.withMessage( - "Cannot issue the query, subqueries generated results beyond maximum[%d] bytes", - memoryLimit + "Cannot issue the query, subqueries generated results beyond maximum[%d] rows", + rowLimitToUse ); } + subqueryStatsProvider.incrementSubqueriesWithRowLimit(); + subqueryStatsProvider.incrementSubqueriesFallingBackToRowLimit(); dataSource = materializeResultsAsArray( query, results, toolChest, limitAccumulator, - limit + limit, + subqueryStatsProvider ); } else { + subqueryStatsProvider.incrementSubqueriesWithByteLimit(); dataSource = maybeDataSource.get(); } break; default: - throw new IAE("Only row based and memory based limiting is supported"); + throw DruidException.defensive("Only row based and memory based limiting is supported"); } return dataSource; } @@ -717,7 +754,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, long memoryLimit, - boolean useNestedForUnknownTypeInSubquery + boolean useNestedForUnknownTypeInSubquery, + final SubqueryCountStatsProvider subqueryStatsProvider ) { Optional> framesOptional; @@ -730,7 +768,13 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker useNestedForUnknownTypeInSubquery ); } + catch (UnsupportedColumnTypeException e) { + subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnsufficientTypeInfo(); + log.debug(e, "Type info in signature insufficient to materialize rows as frames."); + return Optional.empty(); + } catch (Exception e) { + subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnknownReason(); log.debug(e, "Unable to materialize the results as frames due to an unhandleable exception " + "while conversion. Defaulting to materializing the results as rows"); return Optional.empty(); @@ -747,6 +791,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker frame -> { limitAccumulator.addAndGet(frame.getFrame().numRows()); if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) { + subqueryStatsProvider.incrementQueriesExceedingByteLimit(); throw ResourceLimitExceededException.withMessage( "Subquery generated results beyond maximum[%d] bytes", memoryLimit @@ -767,7 +812,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final Sequence results, final QueryToolChest toolChest, final AtomicInteger limitAccumulator, - final int limit + final int limit, + final SubqueryCountStatsProvider subqueryStatsProvider ) { final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit; @@ -779,6 +825,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker resultList, (acc, in) -> { if (limitAccumulator.getAndIncrement() >= rowLimitToUse) { + subqueryStatsProvider.incrementQueriesExceedingRowLimit(); throw ResourceLimitExceededException.withMessage( "Subquery generated results beyond maximum[%d] rows", rowLimitToUse diff --git a/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java b/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java new file mode 100644 index 00000000000..541f4474481 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.lookup.LookupExtractor; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainer; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +/** + * Aids the {@link ClientQuerySegmentWalker} compute the available heap size per query for materializing the inline + * results from the subqueries + */ +public class SubqueryGuardrailHelper +{ + private static final double SUBQUERY_MEMORY_BYTES_FRACTION = 0.5; + private static final Logger log = new Logger(SubqueryGuardrailHelper.class); + + + public static final String UNLIMITED_LIMIT_VALUE = "unlimited"; + public static final String AUTO_LIMIT_VALUE = "auto"; + + public static final Long UNLIMITED_LIMIT_REPRESENTATION = -1L; + + private final long autoLimitBytes; + + public SubqueryGuardrailHelper( + final LookupExtractorFactoryContainerProvider lookupManager, + final long maxMemoryInJvm, + final int brokerNumHttpConnections + ) + { + final DateTime start = DateTimes.nowUtc(); + autoLimitBytes = computeLimitBytesForAuto(lookupManager, maxMemoryInJvm, brokerNumHttpConnections); + final long startupTimeMs = new Interval(start, DateTimes.nowUtc()).toDurationMillis(); + + log.info("Took [%d] ms to initialize the SubqueryGuardrailHelper.", startupTimeMs); + + if (startupTimeMs >= 10_000) { + log.warn("Took more than 10 seconds to initialize the SubqueryGuardrailHelper. " + + "This happens when the lookup sizes are very large. " + + "Consider lowering the size of the lookups to reduce the initialization time." + ); + } + + log.info("Each query has a memory limit of [%d] bytes to materialize its subqueries' results if auto " + + "limit is used", autoLimitBytes); + } + + public long convertSubqueryLimitStringToLong(final String maxSubqueryLimit) + { + if (UNLIMITED_LIMIT_VALUE.equalsIgnoreCase(maxSubqueryLimit)) { + return UNLIMITED_LIMIT_REPRESENTATION; + } + if (AUTO_LIMIT_VALUE.equalsIgnoreCase(maxSubqueryLimit)) { + return autoLimitBytes; + } + + long retVal; + try { + retVal = Long.parseLong(maxSubqueryLimit); + } + catch (NumberFormatException e) { + throw InvalidInput.exception( + e, + "Unable to parse the provided maxSubqueryLimit [%s] to a valid number. Valid values for the " + + "maxSubqueryLimits can be 'auto', 'unlimited' or a positive number representing bytes to reserve.", + maxSubqueryLimit + ); + } + + // This can happen if the provided number is greater than Longs.MAX_VALUE + if (retVal < 0) { + throw InvalidInput.exception("Limit too large"); + } + + return retVal; + } + + /** + * Computes the byte limit when 'auto' is passed as a parameter. This computes the total heap space available + * for the subquery inlining by getting a fraction of the total heap space in JVM, removing the size of the lookups, + * and dividing it by the maximum concurrent queries that can run. Maximum concurrent queries that Druid can + * run is usually limited by its broker's http threadpool size + * + * Consider a JVM running locally with 4 GB heap size, 20 Broker threads and 100 MB space required by the lookups. + * Each query under 'auto' would then get 97.5 MB to materialize the results. Considering the default of 100,000 rows + * and each row consuming 300 - 700 bytes of heap space, the subqueries would approximately consume between 30 MB to + * 70 MB of data, which looks approximately equivalent to what we reserved with auto. This would wildly vary as we have + * larger rows, but that's where the "auto" factor of subquery bytes come into play, where we would estimate by size + * the number of rows that can be materialized based on the memory they consume. + */ + private static long computeLimitBytesForAuto( + final LookupExtractorFactoryContainerProvider lookupManager, + final long maxMemoryInJvm, + final int brokerNumHttpConnections + ) + { + long memoryInJvmWithoutLookups = maxMemoryInJvm - computeLookupFootprint(lookupManager); + long memoryInJvmForSubqueryResultsInlining = (long) (memoryInJvmWithoutLookups * SUBQUERY_MEMORY_BYTES_FRACTION); + long memoryInJvmForSubqueryResultsInliningPerQuery = memoryInJvmForSubqueryResultsInlining + / brokerNumHttpConnections; + return Math.max(memoryInJvmForSubqueryResultsInliningPerQuery, 1L); + } + + /** + * Computes the size occupied by the lookups. If the size of the lookup cannot be computed, it skips over the lookup + */ + private static long computeLookupFootprint(final LookupExtractorFactoryContainerProvider lookupManager) + { + + if (lookupManager == null || lookupManager.getAllLookupNames() == null) { + log.warn("Failed to get the lookupManager for estimating lookup size. Skipping."); + return 0; + } + + int lookupCount = 0; + long lookupFootprint = 0; + + for (final String lookupName : lookupManager.getAllLookupNames()) { + final LookupExtractorFactoryContainer container = lookupManager.get(lookupName).orElse(null); + + if (container != null) { + try { + final LookupExtractor extractor = container.getLookupExtractorFactory().get(); + lookupFootprint += extractor.estimateHeapFootprint(); + lookupCount++; + } + catch (Exception e) { + log.noStackTrace().warn(e, "Failed to load lookup [%s] for size estimation. Skipping.", lookupName); + } + } + } + + log.debug("Lookup footprint: [%d] lookups with [%,d] total bytes.", lookupCount, lookupFootprint); + + return lookupFootprint; + } +} diff --git a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java index 276a0030af4..04366d30dcd 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java @@ -44,7 +44,7 @@ import java.util.zip.Deflater; public class ServerConfig { public static final int DEFAULT_GZIP_INFLATE_BUFFER_SIZE = 4096; - public static final long DEFAULT_MAX_SUBQUERY_BYTES = -1L; + public static final String DEFAULT_MAX_SUBQUERY_BYTES = "unlimited"; private static final boolean DEFAULT_USE_NESTED_FOR_UNKNOWN_TYPE_IN_SUBQUERY = false; @@ -61,7 +61,7 @@ public class ServerConfig long defaultQueryTimeout, long maxScatterGatherBytes, int maxSubqueryRows, - long maxSubqueryBytes, + String maxSubqueryBytes, boolean useNestedForUnknownTypeInSubquery, long maxQueryTimeout, int maxRequestHeaderSize, @@ -140,7 +140,7 @@ public class ServerConfig private int maxSubqueryRows = 100000; @JsonProperty - private long maxSubqueryBytes = DEFAULT_MAX_SUBQUERY_BYTES; + private String maxSubqueryBytes = DEFAULT_MAX_SUBQUERY_BYTES; @JsonProperty private boolean useNestedForUnknownTypeInSubquery = DEFAULT_USE_NESTED_FOR_UNKNOWN_TYPE_IN_SUBQUERY; @@ -231,7 +231,7 @@ public class ServerConfig return maxSubqueryRows; } - public long getMaxSubqueryBytes() + public String getMaxSubqueryBytes() { return maxSubqueryBytes; } @@ -322,7 +322,7 @@ public class ServerConfig enableRequestLimit == that.enableRequestLimit && defaultQueryTimeout == that.defaultQueryTimeout && maxSubqueryRows == that.maxSubqueryRows && - maxSubqueryBytes == that.maxSubqueryBytes && + Objects.equals(maxSubqueryBytes, that.maxSubqueryBytes) && useNestedForUnknownTypeInSubquery == that.useNestedForUnknownTypeInSubquery && maxQueryTimeout == that.maxQueryTimeout && maxRequestHeaderSize == that.maxRequestHeaderSize && diff --git a/server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsMonitor.java new file mode 100644 index 00000000000..57da36ac154 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsMonitor.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.AbstractMonitor; +import org.apache.druid.java.util.metrics.KeyedDiff; + +import java.util.Map; + +/** + * Monitors and emits the metrics corresponding to the subqueries and their materialization. + */ +public class SubqueryCountStatsMonitor extends AbstractMonitor +{ + + private static final String KEY = "subqueryCountStats"; + + private static final String ROW_LIMIT_COUNT = "subquery/rowLimit/count"; + private static final String BYTE_LIMIT_COUNT = "subquery/byteLimit/count"; + private static final String FALLBACK_COUNT = "subquery/fallback/count"; + private static final String INSUFFICIENT_TYPE_COUNT = "subquery/fallback/insufficientType/count"; + private static final String UNKNOWN_REASON_COUNT = "subquery/fallback/unknownReason/count"; + private static final String ROW_LIMIT_EXCEEDED_COUNT = "query/rowLimit/exceeded/count"; + private static final String BYTE_LIMIT_EXCEEDED_COUNT = "query/byteLimit/exceeded/count"; + + private final KeyedDiff keyedDiff = new KeyedDiff(); + private final SubqueryCountStatsProvider statsProvider; + + @Inject + public SubqueryCountStatsMonitor(SubqueryCountStatsProvider statsProvider) + { + this.statsProvider = statsProvider; + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + final long subqueriesWithRowBasedLimit = statsProvider.subqueriesWithRowLimit(); + final long subqueriesWithByteBasedLimit = statsProvider.subqueriesWithByteLimit(); + final long subqueriesFallingBackToRowBasedLimit = statsProvider.subqueriesFallingBackToRowLimit(); + final long subqueriesFallingBackDueToUnsufficientTypeInfo = statsProvider.subqueriesFallingBackDueToUnsufficientTypeInfo(); + final long subqueriesFallingBackDueToUnknownReason = statsProvider.subqueriesFallingBackDueUnknownReason(); + final long queriesExceedingRowLimit = statsProvider.queriesExceedingRowLimit(); + final long queriesExceedingByteLimit = statsProvider.queriesExceedingByteLimit(); + + Map diff = keyedDiff.to( + KEY, + ImmutableMap.of( + ROW_LIMIT_COUNT, subqueriesWithRowBasedLimit, + BYTE_LIMIT_COUNT, subqueriesWithByteBasedLimit, + FALLBACK_COUNT, subqueriesFallingBackToRowBasedLimit, + INSUFFICIENT_TYPE_COUNT, subqueriesFallingBackDueToUnsufficientTypeInfo, + UNKNOWN_REASON_COUNT, subqueriesFallingBackDueToUnknownReason, + ROW_LIMIT_EXCEEDED_COUNT, queriesExceedingRowLimit, + BYTE_LIMIT_EXCEEDED_COUNT, queriesExceedingByteLimit + ) + ); + + if (diff != null) { + for (Map.Entry diffEntry : diff.entrySet()) { + emitter.emit(builder.setMetric(diffEntry.getKey(), diffEntry.getValue())); + } + } + + return true; + } +} diff --git a/server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsProvider.java new file mode 100644 index 00000000000..c7007765496 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/SubqueryCountStatsProvider.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Collects the metrics corresponding to the subqueries and their materialization. + */ +public class SubqueryCountStatsProvider +{ + + private final AtomicLong successfulSubqueriesWithRowLimit = new AtomicLong(); + private final AtomicLong successfulSubqueriesWithByteLimit = new AtomicLong(); + private final AtomicLong subqueriesFallingBackToRowLimit = new AtomicLong(); + private final AtomicLong subqueriesFallingBackDueToUnsufficientTypeInfo = new AtomicLong(); + private final AtomicLong subqueriesFallingBackDueToUnknownReason = new AtomicLong(); + private final AtomicLong queriesExceedingRowLimit = new AtomicLong(); + private final AtomicLong queriesExceedingByteLimit = new AtomicLong(); + + /** + * @return Count of subqueries where the results are materialized as rows i.e. {@code List} + */ + public long subqueriesWithRowLimit() + { + return successfulSubqueriesWithRowLimit.get(); + } + + /** + * @return Count of subqueries where the results are materialized as {@link org.apache.druid.frame.Frame} + */ + public long subqueriesWithByteLimit() + { + return successfulSubqueriesWithByteLimit.get(); + } + + /** + * @return Count of subqueries where the results are + */ + public long subqueriesFallingBackToRowLimit() + { + return subqueriesFallingBackToRowLimit.get(); + } + + /** + * @return Count of the subset of subqueries that are falling back due to insufficient type information in the + * {@link org.apache.druid.segment.column.RowSignature}. This is expected to be the most common and already known + * cause of fallback, therefore this is added as a separate metric + */ + public long subqueriesFallingBackDueToUnsufficientTypeInfo() + { + return subqueriesFallingBackDueToUnsufficientTypeInfo.get(); + } + + /** + * @return Count of the subset of subqueries that are falling back due to insufficient an unknown error. This can be due to a + * few known reasons like columnar frames not supporting the array types right now, or due to unknown errors while + * performing the materialization + */ + public long subqueriesFallingBackDueUnknownReason() + { + return subqueriesFallingBackDueToUnknownReason.get(); + } + + /** + * @return Number of queries that fail due to their subqueries exceeding the prescribed row limit + */ + public long queriesExceedingRowLimit() + { + return queriesExceedingRowLimit.get(); + } + + /** + * @return Number of subqueries that fail due to their subqueries exceeding the prescribed byte limit + */ + public long queriesExceedingByteLimit() + { + return queriesExceedingByteLimit.get(); + } + + + public void incrementSubqueriesWithRowLimit() + { + successfulSubqueriesWithRowLimit.incrementAndGet(); + } + + public void incrementSubqueriesWithByteLimit() + { + successfulSubqueriesWithByteLimit.incrementAndGet(); + } + + public void incrementSubqueriesFallingBackToRowLimit() + { + subqueriesFallingBackToRowLimit.incrementAndGet(); + } + + public void incrementSubqueriesFallingBackDueToUnsufficientTypeInfo() + { + subqueriesFallingBackDueToUnsufficientTypeInfo.incrementAndGet(); + } + + public void incrementSubqueriesFallingBackDueToUnknownReason() + { + subqueriesFallingBackDueToUnknownReason.incrementAndGet(); + } + + public void incrementQueriesExceedingRowLimit() + { + queriesExceedingRowLimit.incrementAndGet(); + } + + public void incrementQueriesExceedingByteLimit() + { + queriesExceedingByteLimit.incrementAndGet(); + } +} diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 7180b03983f..fcdc894f61f 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -1439,7 +1439,8 @@ public class ClientQuerySegmentWalkerTest ), conglomerate, joinableFactory, - serverConfig + serverConfig, + null ); } diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 1c9f15acf79..63a751bbb89 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -22,6 +22,7 @@ package org.apache.druid.server; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.BrokerParallelMergeConfig; @@ -79,6 +80,7 @@ import org.apache.druid.segment.join.LookupJoinableFactory; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -116,7 +118,8 @@ public class QueryStackTests final QuerySegmentWalker localWalker, final QueryRunnerFactoryConglomerate conglomerate, final JoinableFactory joinableFactory, - final ServerConfig serverConfig + final ServerConfig serverConfig, + final LookupExtractorFactoryContainerProvider lookupManager ) { return new ClientQuerySegmentWalker( @@ -161,7 +164,17 @@ public class QueryStackTests { return false; } - } + }, + lookupManager, + new DruidHttpClientConfig() + { + @Override + public int getNumConnections() + { + return 1; + } + }, + new SubqueryCountStatsProvider() ); } diff --git a/server/src/test/java/org/apache/druid/server/SubqueryGuardrailHelperTest.java b/server/src/test/java/org/apache/druid/server/SubqueryGuardrailHelperTest.java new file mode 100644 index 00000000000..07cecb59680 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/SubqueryGuardrailHelperTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.query.lookup.LookupExtractor; +import org.apache.druid.query.lookup.LookupExtractorFactory; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainer; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Optional; + +/** + * Tests that the subquery limit utils gives out valid byte limits when supplied with 'auto'. + * Assuming that the average segment size is 500 MB to 1 GB, and optimal number of rows in the segment is 5 million, the + * typical bytes per row can be from 100 bytes to 200 bytes. We can keep this in mind while trying to look through the test + * values + */ +public class SubqueryGuardrailHelperTest +{ + + @Test + public void testConvertSubqueryLimitStringToLongWithoutLookups() + { + Assert.assertEquals( + 10737418L, + fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("512MiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 13421772L, + fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("640MiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 16106127L, + fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("768MiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 21474836L, + fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("1GiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 171798691L, + fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("8GiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 429496729L, + fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("20GiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + } + + @Test + public void testConvertSubqueryLimitStringToLongWithLookups() + { + Assert.assertEquals( + 10527703L, + fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("512MiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 13212057L, + fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("640MiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 15896412, + fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("768MiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 21265121L, + fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("1GiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 171588976L, + fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("8GiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + + Assert.assertEquals( + 429287014L, + fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("20GiB"), 25) + .convertSubqueryLimitStringToLong("auto") + ); + } + + + private SubqueryGuardrailHelper fetchSubqueryLimitUtilsForNoLookups(long maxMemoryInJvm, int brokerNumHttpConnections) + { + return new SubqueryGuardrailHelper(null, maxMemoryInJvm, brokerNumHttpConnections); + } + + private SubqueryGuardrailHelper fetchSubqueryLimitUtilsForLookups(long maxMemoryInJvm, int brokerNumHttpConnections) + { + return new SubqueryGuardrailHelper(lookupManager(), maxMemoryInJvm, brokerNumHttpConnections); + } + + + private static long humanReadableSizeToBytes(String humanReadableSize) + { + return new HumanReadableBytes(humanReadableSize).getBytes(); + } + + public static LookupExtractorFactoryContainerProvider lookupManager() + { + LookupExtractorFactoryContainerProvider lookupManager = EasyMock.mock(LookupExtractorFactoryContainerProvider.class); + + EasyMock.expect(lookupManager.getAllLookupNames()).andReturn(ImmutableSet.of("lookupFoo", "lookupBar")).anyTimes(); + + LookupExtractorFactoryContainer lookupFooContainer = EasyMock.mock(LookupExtractorFactoryContainer.class); + EasyMock.expect(lookupManager.get("lookupFoo")).andReturn(Optional.of(lookupFooContainer)); + LookupExtractorFactory lookupFooExtractorFactory = EasyMock.mock(LookupExtractorFactory.class); + EasyMock.expect(lookupFooContainer.getLookupExtractorFactory()).andReturn(lookupFooExtractorFactory); + LookupExtractor lookupFooExtractor = EasyMock.mock(LookupExtractor.class); + EasyMock.expect(lookupFooExtractorFactory.get()).andReturn(lookupFooExtractor); + EasyMock.expect(lookupFooExtractor.estimateHeapFootprint()).andReturn(humanReadableSizeToBytes("10MiB")); + + EasyMock.expect(lookupManager.get("lookupBar")).andReturn(Optional.empty()); + + EasyMock.replay(lookupManager, lookupFooContainer, lookupFooExtractor, lookupFooExtractorFactory); + + return lookupManager; + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index dd9c3c3c0b5..1de8a0f1d47 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -69,6 +69,7 @@ import org.apache.druid.server.http.SegmentListerResource; import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.QueryCountStatsProvider; +import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.apache.druid.server.router.TieredBrokerConfig; import org.apache.druid.sql.guice.SqlModule; import org.apache.druid.timeline.PruneLoadSpec; @@ -146,6 +147,7 @@ public class CliBroker extends ServerRunnable binder.bind(BrokerQueryResource.class).in(LazySingleton.class); Jerseys.addResource(binder, BrokerQueryResource.class); binder.bind(QueryCountStatsProvider.class).to(BrokerQueryResource.class).in(LazySingleton.class); + binder.bind(SubqueryCountStatsProvider.class).toInstance(new SubqueryCountStatsProvider()); Jerseys.addResource(binder, BrokerResource.class); Jerseys.addResource(binder, ClientInfoResource.class); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index b43a6515956..a47bbcf9577 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -116,7 +116,8 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C ), conglomerate, joinableFactoryWrapper.getJoinableFactory(), - new ServerConfig() + new ServerConfig(), + LOOKUP_EXTRACTOR_FACTORY_CONTAINER_PROVIDER ); }