Auto configuration for maxSubqueryBytes (#14808)

A new monitor SubqueryCountStatsMonitor which emits the metrics corresponding to the subqueries and their execution is now introduced. Moreover, the user can now also use the auto mode to automatically set the number of bytes available per query for the inlining of its subquery's results.
This commit is contained in:
Laksh Singla 2023-09-06 05:47:19 +00:00 committed by GitHub
parent 959148ad37
commit 6ee0b06e38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 682 additions and 36 deletions

View File

@ -395,6 +395,7 @@ Metric monitoring is an essential part of Druid operations. The following monit
|`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical processes. Available only on Historical processes.|
|`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical processes. Available only on Historical processes. Not to be used when lazy loading is configured. |
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|
|`org.apache.druid.server.metrics.SubqueryCountStatsMonitor`|Reports how many subqueries have been materialized as rows or bytes and various other statistics related to the subquery execution|
|`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: https://github.com/apache/druid/pull/4973.|
|`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.|
|`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics about task slot usage per emission period.|

View File

@ -72,6 +72,13 @@ Metrics may have additional dimensions beyond those listed above.
|`metadatacache/refresh/time`|Time taken to refresh segments in broker segment metadata cache.|`dataSource`|
|`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
|`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.|
|`subquery/rowLimit/count`|Number of subqueries whose results are materialized as rows (Java objects on heap).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`subquery/byteLimit/count`|Number of subqueries whose results are materialized as frames (Druid's internal byte representation of rows).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`subquery/fallback/count`|Number of subqueries which cannot be materialized as frames|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`subquery/fallback/insufficientType/count`|Number of subqueries which cannot be materialized as frames due to insufficient type information in the row signature.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be materialized as frames due other reasons.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given row limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given byte limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
### Historical

View File

@ -74,6 +74,7 @@ import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.metrics.SubqueryCountStatsProvider;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.TimelineLookup;
import org.hamcrest.core.IsInstanceOf;
@ -310,6 +311,15 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
Assert.assertNotNull(expectedResults);
Assert.assertThat(expectedResults, IsInstanceOf.instanceOf(List.class));
DruidHttpClientConfig httpClientConfig = new DruidHttpClientConfig()
{
@Override
public long getMaxQueuedBytes()
{
return 0L;
}
};
CachingClusteredClient baseClient = new CachingClusteredClient(
warehouse,
new TimelineServerView()
@ -354,14 +364,7 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
jsonMapper,
new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1),
new CacheConfig(),
new DruidHttpClientConfig()
{
@Override
public long getMaxQueuedBytes()
{
return 0L;
}
},
httpClientConfig,
new BrokerParallelMergeConfig(),
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
@ -384,7 +387,10 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
jsonMapper,
serverConfig,
null,
new CacheConfig()
new CacheConfig(),
null,
httpClientConfig,
new SubqueryCountStatsProvider()
);
defineMocks();

View File

@ -347,9 +347,14 @@ public class QueryContext
return getInt(QueryContexts.MAX_SUBQUERY_ROWS_KEY, defaultSize);
}
public long getMaxSubqueryMemoryBytes(long defaultMemoryBytes)
public String getMaxSubqueryMemoryBytes(String defaultMemoryBytes)
{
return getLong(QueryContexts.MAX_SUBQUERY_BYTES_KEY, defaultMemoryBytes);
// Generic to allow for both strings and numbers to be passed as values in the query context
Object maxSubqueryBytesObject = get(QueryContexts.MAX_SUBQUERY_BYTES_KEY);
if (maxSubqueryBytesObject == null) {
maxSubqueryBytesObject = defaultMemoryBytes;
}
return String.valueOf(maxSubqueryBytesObject);
}
public boolean isUseNestedForUnknownTypeInSubquery(boolean defaultUseNestedForUnkownTypeInSubquery)

View File

@ -325,6 +325,23 @@ public class QueryContextTest
assertThrows(BadQueryContextException.class, () -> context.getHumanReadableBytes("m6", HumanReadableBytes.ZERO));
}
@Test
public void testGetMaxSubqueryBytes()
{
final QueryContext context1 = new QueryContext(
ImmutableMap.of(QueryContexts.MAX_SUBQUERY_BYTES_KEY, 500_000_000)
);
assertEquals("500000000", context1.getMaxSubqueryMemoryBytes(null));
final QueryContext context2 = new QueryContext(
ImmutableMap.of(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "auto")
);
assertEquals("auto", context2.getMaxSubqueryMemoryBytes(null));
final QueryContext context3 = new QueryContext(ImmutableMap.of());
assertEquals("unlimited", context3.getMaxSubqueryMemoryBytes("unlimited"));
}
@Test
public void testDefaultEnableQueryDebugging()
{

View File

@ -53,6 +53,7 @@ import java.io.File;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -310,6 +311,9 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
@Override
public Set<String> getAllLookupNames()
{
if (stateRef.get() == null) {
return Collections.emptySet();
}
return stateRef.get().lookupMap.keySet();
}

View File

@ -28,8 +28,11 @@ import org.apache.druid.client.CachingClusteredClient;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.frame.write.UnsupportedColumnTypeException;
import org.apache.druid.guice.annotations.Client;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequence;
@ -57,10 +60,12 @@ import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.SubqueryCountStatsProvider;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@ -103,6 +108,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private final ServerConfig serverConfig;
private final Cache cache;
private final CacheConfig cacheConfig;
private final SubqueryGuardrailHelper subqueryGuardrailHelper;
private final SubqueryCountStatsProvider subqueryStatsProvider;
public ClientQuerySegmentWalker(
ServiceEmitter emitter,
@ -114,7 +121,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
ObjectMapper objectMapper,
ServerConfig serverConfig,
Cache cache,
CacheConfig cacheConfig
CacheConfig cacheConfig,
LookupExtractorFactoryContainerProvider lookupManager,
DruidHttpClientConfig httpClientConfig,
SubqueryCountStatsProvider subqueryStatsProvider
)
{
this.emitter = emitter;
@ -127,6 +137,12 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
this.serverConfig = serverConfig;
this.cache = cache;
this.cacheConfig = cacheConfig;
this.subqueryGuardrailHelper = new SubqueryGuardrailHelper(
lookupManager,
Runtime.getRuntime().maxMemory(),
httpClientConfig.getNumConnections()
);
this.subqueryStatsProvider = subqueryStatsProvider;
}
@Inject
@ -140,7 +156,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
ObjectMapper objectMapper,
ServerConfig serverConfig,
Cache cache,
CacheConfig cacheConfig
CacheConfig cacheConfig,
LookupExtractorFactoryContainerProvider lookupManager,
@Client DruidHttpClientConfig httpClientConfig,
SubqueryCountStatsProvider subqueryStatsProvider
)
{
this(
@ -153,7 +172,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
objectMapper,
serverConfig,
cache,
cacheConfig
cacheConfig,
lookupManager,
httpClientConfig,
subqueryStatsProvider
);
}
@ -175,10 +197,13 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final DataSource freeTradeDataSource = globalizeIfPossible(newQuery.getDataSource());
// do an inlining dry run to see if any inlining is necessary, without actually running the queries.
final int maxSubqueryRows = query.context().getMaxSubqueryRows(serverConfig.getMaxSubqueryRows());
final long maxSubqueryMemory = query.context().getMaxSubqueryMemoryBytes(serverConfig.getMaxSubqueryBytes());
final String maxSubqueryMemoryString = query.context()
.getMaxSubqueryMemoryBytes(serverConfig.getMaxSubqueryBytes());
final long maxSubqueryMemory = subqueryGuardrailHelper.convertSubqueryLimitStringToLong(maxSubqueryMemoryString);
final boolean useNestedForUnknownTypeInSubquery = query.context()
.isUseNestedForUnknownTypeInSubquery(serverConfig.isuseNestedForUnknownTypeInSubquery());
final DataSource inlineDryRun = inlineIfNecessary(
freeTradeDataSource,
toolChest,
@ -426,7 +451,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
cannotMaterializeToFrames,
maxSubqueryRows,
maxSubqueryMemory,
useNestedForUnknownTypeInSubquery
useNestedForUnknownTypeInSubquery,
subqueryStatsProvider
);
} else {
// Cannot inline subquery. Attempt to inline one level deeper, and then try again.
@ -641,7 +667,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final AtomicBoolean cannotMaterializeToFrames,
final int limit,
long memoryLimit,
boolean useNestedForUnknownTypeInSubquery
boolean useNestedForUnknownTypeInSubquery,
SubqueryCountStatsProvider subqueryStatsProvider
)
{
final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
@ -651,21 +678,25 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
switch (ClientQuerySegmentWalkerUtils.getLimitType(memoryLimit, cannotMaterializeToFrames.get())) {
case ROW_LIMIT:
if (limitAccumulator.get() >= rowLimitToUse) {
subqueryStatsProvider.incrementQueriesExceedingRowLimit();
throw ResourceLimitExceededException.withMessage(
"Cannot issue the query, subqueries generated results beyond maximum[%d] rows",
rowLimitToUse
);
}
subqueryStatsProvider.incrementSubqueriesWithRowLimit();
dataSource = materializeResultsAsArray(
query,
results,
toolChest,
limitAccumulator,
limit
limit,
subqueryStatsProvider
);
break;
case MEMORY_LIMIT:
if (memoryLimitAccumulator.get() >= memoryLimit) {
subqueryStatsProvider.incrementQueriesExceedingByteLimit();
throw ResourceLimitExceededException.withMessage(
"Cannot issue the query, subqueries generated results beyond maximum[%d] bytes",
memoryLimit
@ -678,30 +709,36 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
limitAccumulator,
memoryLimitAccumulator,
memoryLimit,
useNestedForUnknownTypeInSubquery
useNestedForUnknownTypeInSubquery,
subqueryStatsProvider
);
if (!maybeDataSource.isPresent()) {
cannotMaterializeToFrames.set(true);
// Check if the previous row limit accumulator has exceeded the memory results
if (memoryLimitAccumulator.get() >= memoryLimit) {
if (limitAccumulator.get() >= rowLimitToUse) {
subqueryStatsProvider.incrementQueriesExceedingRowLimit();
throw ResourceLimitExceededException.withMessage(
"Cannot issue the query, subqueries generated results beyond maximum[%d] bytes",
memoryLimit
"Cannot issue the query, subqueries generated results beyond maximum[%d] rows",
rowLimitToUse
);
}
subqueryStatsProvider.incrementSubqueriesWithRowLimit();
subqueryStatsProvider.incrementSubqueriesFallingBackToRowLimit();
dataSource = materializeResultsAsArray(
query,
results,
toolChest,
limitAccumulator,
limit
limit,
subqueryStatsProvider
);
} else {
subqueryStatsProvider.incrementSubqueriesWithByteLimit();
dataSource = maybeDataSource.get();
}
break;
default:
throw new IAE("Only row based and memory based limiting is supported");
throw DruidException.defensive("Only row based and memory based limiting is supported");
}
return dataSource;
}
@ -717,7 +754,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final AtomicInteger limitAccumulator,
final AtomicLong memoryLimitAccumulator,
long memoryLimit,
boolean useNestedForUnknownTypeInSubquery
boolean useNestedForUnknownTypeInSubquery,
final SubqueryCountStatsProvider subqueryStatsProvider
)
{
Optional<Sequence<FrameSignaturePair>> framesOptional;
@ -730,7 +768,13 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
useNestedForUnknownTypeInSubquery
);
}
catch (UnsupportedColumnTypeException e) {
subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnsufficientTypeInfo();
log.debug(e, "Type info in signature insufficient to materialize rows as frames.");
return Optional.empty();
}
catch (Exception e) {
subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnknownReason();
log.debug(e, "Unable to materialize the results as frames due to an unhandleable exception "
+ "while conversion. Defaulting to materializing the results as rows");
return Optional.empty();
@ -747,6 +791,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
frame -> {
limitAccumulator.addAndGet(frame.getFrame().numRows());
if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) {
subqueryStatsProvider.incrementQueriesExceedingByteLimit();
throw ResourceLimitExceededException.withMessage(
"Subquery generated results beyond maximum[%d] bytes",
memoryLimit
@ -767,7 +812,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final Sequence<T> results,
final QueryToolChest<T, QueryType> toolChest,
final AtomicInteger limitAccumulator,
final int limit
final int limit,
final SubqueryCountStatsProvider subqueryStatsProvider
)
{
final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
@ -779,6 +825,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
resultList,
(acc, in) -> {
if (limitAccumulator.getAndIncrement() >= rowLimitToUse) {
subqueryStatsProvider.incrementQueriesExceedingRowLimit();
throw ResourceLimitExceededException.withMessage(
"Subquery generated results beyond maximum[%d] rows",
rowLimitToUse

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.joda.time.DateTime;
import org.joda.time.Interval;
/**
* Aids the {@link ClientQuerySegmentWalker} compute the available heap size per query for materializing the inline
* results from the subqueries
*/
public class SubqueryGuardrailHelper
{
private static final double SUBQUERY_MEMORY_BYTES_FRACTION = 0.5;
private static final Logger log = new Logger(SubqueryGuardrailHelper.class);
public static final String UNLIMITED_LIMIT_VALUE = "unlimited";
public static final String AUTO_LIMIT_VALUE = "auto";
public static final Long UNLIMITED_LIMIT_REPRESENTATION = -1L;
private final long autoLimitBytes;
public SubqueryGuardrailHelper(
final LookupExtractorFactoryContainerProvider lookupManager,
final long maxMemoryInJvm,
final int brokerNumHttpConnections
)
{
final DateTime start = DateTimes.nowUtc();
autoLimitBytes = computeLimitBytesForAuto(lookupManager, maxMemoryInJvm, brokerNumHttpConnections);
final long startupTimeMs = new Interval(start, DateTimes.nowUtc()).toDurationMillis();
log.info("Took [%d] ms to initialize the SubqueryGuardrailHelper.", startupTimeMs);
if (startupTimeMs >= 10_000) {
log.warn("Took more than 10 seconds to initialize the SubqueryGuardrailHelper. "
+ "This happens when the lookup sizes are very large. "
+ "Consider lowering the size of the lookups to reduce the initialization time."
);
}
log.info("Each query has a memory limit of [%d] bytes to materialize its subqueries' results if auto "
+ "limit is used", autoLimitBytes);
}
public long convertSubqueryLimitStringToLong(final String maxSubqueryLimit)
{
if (UNLIMITED_LIMIT_VALUE.equalsIgnoreCase(maxSubqueryLimit)) {
return UNLIMITED_LIMIT_REPRESENTATION;
}
if (AUTO_LIMIT_VALUE.equalsIgnoreCase(maxSubqueryLimit)) {
return autoLimitBytes;
}
long retVal;
try {
retVal = Long.parseLong(maxSubqueryLimit);
}
catch (NumberFormatException e) {
throw InvalidInput.exception(
e,
"Unable to parse the provided maxSubqueryLimit [%s] to a valid number. Valid values for the "
+ "maxSubqueryLimits can be 'auto', 'unlimited' or a positive number representing bytes to reserve.",
maxSubqueryLimit
);
}
// This can happen if the provided number is greater than Longs.MAX_VALUE
if (retVal < 0) {
throw InvalidInput.exception("Limit too large");
}
return retVal;
}
/**
* Computes the byte limit when 'auto' is passed as a parameter. This computes the total heap space available
* for the subquery inlining by getting a fraction of the total heap space in JVM, removing the size of the lookups,
* and dividing it by the maximum concurrent queries that can run. Maximum concurrent queries that Druid can
* run is usually limited by its broker's http threadpool size
*
* Consider a JVM running locally with 4 GB heap size, 20 Broker threads and 100 MB space required by the lookups.
* Each query under 'auto' would then get 97.5 MB to materialize the results. Considering the default of 100,000 rows
* and each row consuming 300 - 700 bytes of heap space, the subqueries would approximately consume between 30 MB to
* 70 MB of data, which looks approximately equivalent to what we reserved with auto. This would wildly vary as we have
* larger rows, but that's where the "auto" factor of subquery bytes come into play, where we would estimate by size
* the number of rows that can be materialized based on the memory they consume.
*/
private static long computeLimitBytesForAuto(
final LookupExtractorFactoryContainerProvider lookupManager,
final long maxMemoryInJvm,
final int brokerNumHttpConnections
)
{
long memoryInJvmWithoutLookups = maxMemoryInJvm - computeLookupFootprint(lookupManager);
long memoryInJvmForSubqueryResultsInlining = (long) (memoryInJvmWithoutLookups * SUBQUERY_MEMORY_BYTES_FRACTION);
long memoryInJvmForSubqueryResultsInliningPerQuery = memoryInJvmForSubqueryResultsInlining
/ brokerNumHttpConnections;
return Math.max(memoryInJvmForSubqueryResultsInliningPerQuery, 1L);
}
/**
* Computes the size occupied by the lookups. If the size of the lookup cannot be computed, it skips over the lookup
*/
private static long computeLookupFootprint(final LookupExtractorFactoryContainerProvider lookupManager)
{
if (lookupManager == null || lookupManager.getAllLookupNames() == null) {
log.warn("Failed to get the lookupManager for estimating lookup size. Skipping.");
return 0;
}
int lookupCount = 0;
long lookupFootprint = 0;
for (final String lookupName : lookupManager.getAllLookupNames()) {
final LookupExtractorFactoryContainer container = lookupManager.get(lookupName).orElse(null);
if (container != null) {
try {
final LookupExtractor extractor = container.getLookupExtractorFactory().get();
lookupFootprint += extractor.estimateHeapFootprint();
lookupCount++;
}
catch (Exception e) {
log.noStackTrace().warn(e, "Failed to load lookup [%s] for size estimation. Skipping.", lookupName);
}
}
}
log.debug("Lookup footprint: [%d] lookups with [%,d] total bytes.", lookupCount, lookupFootprint);
return lookupFootprint;
}
}

View File

@ -44,7 +44,7 @@ import java.util.zip.Deflater;
public class ServerConfig
{
public static final int DEFAULT_GZIP_INFLATE_BUFFER_SIZE = 4096;
public static final long DEFAULT_MAX_SUBQUERY_BYTES = -1L;
public static final String DEFAULT_MAX_SUBQUERY_BYTES = "unlimited";
private static final boolean DEFAULT_USE_NESTED_FOR_UNKNOWN_TYPE_IN_SUBQUERY = false;
@ -61,7 +61,7 @@ public class ServerConfig
long defaultQueryTimeout,
long maxScatterGatherBytes,
int maxSubqueryRows,
long maxSubqueryBytes,
String maxSubqueryBytes,
boolean useNestedForUnknownTypeInSubquery,
long maxQueryTimeout,
int maxRequestHeaderSize,
@ -140,7 +140,7 @@ public class ServerConfig
private int maxSubqueryRows = 100000;
@JsonProperty
private long maxSubqueryBytes = DEFAULT_MAX_SUBQUERY_BYTES;
private String maxSubqueryBytes = DEFAULT_MAX_SUBQUERY_BYTES;
@JsonProperty
private boolean useNestedForUnknownTypeInSubquery = DEFAULT_USE_NESTED_FOR_UNKNOWN_TYPE_IN_SUBQUERY;
@ -231,7 +231,7 @@ public class ServerConfig
return maxSubqueryRows;
}
public long getMaxSubqueryBytes()
public String getMaxSubqueryBytes()
{
return maxSubqueryBytes;
}
@ -322,7 +322,7 @@ public class ServerConfig
enableRequestLimit == that.enableRequestLimit &&
defaultQueryTimeout == that.defaultQueryTimeout &&
maxSubqueryRows == that.maxSubqueryRows &&
maxSubqueryBytes == that.maxSubqueryBytes &&
Objects.equals(maxSubqueryBytes, that.maxSubqueryBytes) &&
useNestedForUnknownTypeInSubquery == that.useNestedForUnknownTypeInSubquery &&
maxQueryTimeout == that.maxQueryTimeout &&
maxRequestHeaderSize == that.maxRequestHeaderSize &&

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.metrics;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.druid.java.util.metrics.KeyedDiff;
import java.util.Map;
/**
* Monitors and emits the metrics corresponding to the subqueries and their materialization.
*/
public class SubqueryCountStatsMonitor extends AbstractMonitor
{
private static final String KEY = "subqueryCountStats";
private static final String ROW_LIMIT_COUNT = "subquery/rowLimit/count";
private static final String BYTE_LIMIT_COUNT = "subquery/byteLimit/count";
private static final String FALLBACK_COUNT = "subquery/fallback/count";
private static final String INSUFFICIENT_TYPE_COUNT = "subquery/fallback/insufficientType/count";
private static final String UNKNOWN_REASON_COUNT = "subquery/fallback/unknownReason/count";
private static final String ROW_LIMIT_EXCEEDED_COUNT = "query/rowLimit/exceeded/count";
private static final String BYTE_LIMIT_EXCEEDED_COUNT = "query/byteLimit/exceeded/count";
private final KeyedDiff keyedDiff = new KeyedDiff();
private final SubqueryCountStatsProvider statsProvider;
@Inject
public SubqueryCountStatsMonitor(SubqueryCountStatsProvider statsProvider)
{
this.statsProvider = statsProvider;
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
final long subqueriesWithRowBasedLimit = statsProvider.subqueriesWithRowLimit();
final long subqueriesWithByteBasedLimit = statsProvider.subqueriesWithByteLimit();
final long subqueriesFallingBackToRowBasedLimit = statsProvider.subqueriesFallingBackToRowLimit();
final long subqueriesFallingBackDueToUnsufficientTypeInfo = statsProvider.subqueriesFallingBackDueToUnsufficientTypeInfo();
final long subqueriesFallingBackDueToUnknownReason = statsProvider.subqueriesFallingBackDueUnknownReason();
final long queriesExceedingRowLimit = statsProvider.queriesExceedingRowLimit();
final long queriesExceedingByteLimit = statsProvider.queriesExceedingByteLimit();
Map<String, Long> diff = keyedDiff.to(
KEY,
ImmutableMap.of(
ROW_LIMIT_COUNT, subqueriesWithRowBasedLimit,
BYTE_LIMIT_COUNT, subqueriesWithByteBasedLimit,
FALLBACK_COUNT, subqueriesFallingBackToRowBasedLimit,
INSUFFICIENT_TYPE_COUNT, subqueriesFallingBackDueToUnsufficientTypeInfo,
UNKNOWN_REASON_COUNT, subqueriesFallingBackDueToUnknownReason,
ROW_LIMIT_EXCEEDED_COUNT, queriesExceedingRowLimit,
BYTE_LIMIT_EXCEEDED_COUNT, queriesExceedingByteLimit
)
);
if (diff != null) {
for (Map.Entry<String, Long> diffEntry : diff.entrySet()) {
emitter.emit(builder.setMetric(diffEntry.getKey(), diffEntry.getValue()));
}
}
return true;
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.metrics;
import java.util.concurrent.atomic.AtomicLong;
/**
* Collects the metrics corresponding to the subqueries and their materialization.
*/
public class SubqueryCountStatsProvider
{
private final AtomicLong successfulSubqueriesWithRowLimit = new AtomicLong();
private final AtomicLong successfulSubqueriesWithByteLimit = new AtomicLong();
private final AtomicLong subqueriesFallingBackToRowLimit = new AtomicLong();
private final AtomicLong subqueriesFallingBackDueToUnsufficientTypeInfo = new AtomicLong();
private final AtomicLong subqueriesFallingBackDueToUnknownReason = new AtomicLong();
private final AtomicLong queriesExceedingRowLimit = new AtomicLong();
private final AtomicLong queriesExceedingByteLimit = new AtomicLong();
/**
* @return Count of subqueries where the results are materialized as rows i.e. {@code List<Object>}
*/
public long subqueriesWithRowLimit()
{
return successfulSubqueriesWithRowLimit.get();
}
/**
* @return Count of subqueries where the results are materialized as {@link org.apache.druid.frame.Frame}
*/
public long subqueriesWithByteLimit()
{
return successfulSubqueriesWithByteLimit.get();
}
/**
* @return Count of subqueries where the results are
*/
public long subqueriesFallingBackToRowLimit()
{
return subqueriesFallingBackToRowLimit.get();
}
/**
* @return Count of the subset of subqueries that are falling back due to insufficient type information in the
* {@link org.apache.druid.segment.column.RowSignature}. This is expected to be the most common and already known
* cause of fallback, therefore this is added as a separate metric
*/
public long subqueriesFallingBackDueToUnsufficientTypeInfo()
{
return subqueriesFallingBackDueToUnsufficientTypeInfo.get();
}
/**
* @return Count of the subset of subqueries that are falling back due to insufficient an unknown error. This can be due to a
* few known reasons like columnar frames not supporting the array types right now, or due to unknown errors while
* performing the materialization
*/
public long subqueriesFallingBackDueUnknownReason()
{
return subqueriesFallingBackDueToUnknownReason.get();
}
/**
* @return Number of queries that fail due to their subqueries exceeding the prescribed row limit
*/
public long queriesExceedingRowLimit()
{
return queriesExceedingRowLimit.get();
}
/**
* @return Number of subqueries that fail due to their subqueries exceeding the prescribed byte limit
*/
public long queriesExceedingByteLimit()
{
return queriesExceedingByteLimit.get();
}
public void incrementSubqueriesWithRowLimit()
{
successfulSubqueriesWithRowLimit.incrementAndGet();
}
public void incrementSubqueriesWithByteLimit()
{
successfulSubqueriesWithByteLimit.incrementAndGet();
}
public void incrementSubqueriesFallingBackToRowLimit()
{
subqueriesFallingBackToRowLimit.incrementAndGet();
}
public void incrementSubqueriesFallingBackDueToUnsufficientTypeInfo()
{
subqueriesFallingBackDueToUnsufficientTypeInfo.incrementAndGet();
}
public void incrementSubqueriesFallingBackDueToUnknownReason()
{
subqueriesFallingBackDueToUnknownReason.incrementAndGet();
}
public void incrementQueriesExceedingRowLimit()
{
queriesExceedingRowLimit.incrementAndGet();
}
public void incrementQueriesExceedingByteLimit()
{
queriesExceedingByteLimit.incrementAndGet();
}
}

View File

@ -1439,7 +1439,8 @@ public class ClientQuerySegmentWalkerTest
),
conglomerate,
joinableFactory,
serverConfig
serverConfig,
null
);
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.server;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BrokerParallelMergeConfig;
@ -79,6 +80,7 @@ import org.apache.druid.segment.join.LookupJoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.metrics.SubqueryCountStatsProvider;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -116,7 +118,8 @@ public class QueryStackTests
final QuerySegmentWalker localWalker,
final QueryRunnerFactoryConglomerate conglomerate,
final JoinableFactory joinableFactory,
final ServerConfig serverConfig
final ServerConfig serverConfig,
final LookupExtractorFactoryContainerProvider lookupManager
)
{
return new ClientQuerySegmentWalker(
@ -161,7 +164,17 @@ public class QueryStackTests
{
return false;
}
}
},
lookupManager,
new DruidHttpClientConfig()
{
@Override
public int getNumConnections()
{
return 1;
}
},
new SubqueryCountStatsProvider()
);
}

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.query.lookup.LookupExtractorFactory;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.util.Optional;
/**
* Tests that the subquery limit utils gives out valid byte limits when supplied with 'auto'.
* Assuming that the average segment size is 500 MB to 1 GB, and optimal number of rows in the segment is 5 million, the
* typical bytes per row can be from 100 bytes to 200 bytes. We can keep this in mind while trying to look through the test
* values
*/
public class SubqueryGuardrailHelperTest
{
@Test
public void testConvertSubqueryLimitStringToLongWithoutLookups()
{
Assert.assertEquals(
10737418L,
fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("512MiB"), 25)
.convertSubqueryLimitStringToLong("auto")
);
Assert.assertEquals(
13421772L,
fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("640MiB"), 25)
.convertSubqueryLimitStringToLong("auto")
);
Assert.assertEquals(
16106127L,
fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("768MiB"), 25)
.convertSubqueryLimitStringToLong("auto")
);
Assert.assertEquals(
21474836L,
fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("1GiB"), 25)
.convertSubqueryLimitStringToLong("auto")
);
Assert.assertEquals(
171798691L,
fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("8GiB"), 25)
.convertSubqueryLimitStringToLong("auto")
);
Assert.assertEquals(
429496729L,
fetchSubqueryLimitUtilsForNoLookups(humanReadableSizeToBytes("20GiB"), 25)
.convertSubqueryLimitStringToLong("auto")
);
}
@Test
public void testConvertSubqueryLimitStringToLongWithLookups()
{
Assert.assertEquals(
10527703L,
fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("512MiB"), 25)
.convertSubqueryLimitStringToLong("auto")
);
Assert.assertEquals(
13212057L,
fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("640MiB"), 25)
.convertSubqueryLimitStringToLong("auto")
);
Assert.assertEquals(
15896412,
fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("768MiB"), 25)
.convertSubqueryLimitStringToLong("auto")
);
Assert.assertEquals(
21265121L,
fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("1GiB"), 25)
.convertSubqueryLimitStringToLong("auto")
);
Assert.assertEquals(
171588976L,
fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("8GiB"), 25)
.convertSubqueryLimitStringToLong("auto")
);
Assert.assertEquals(
429287014L,
fetchSubqueryLimitUtilsForLookups(humanReadableSizeToBytes("20GiB"), 25)
.convertSubqueryLimitStringToLong("auto")
);
}
private SubqueryGuardrailHelper fetchSubqueryLimitUtilsForNoLookups(long maxMemoryInJvm, int brokerNumHttpConnections)
{
return new SubqueryGuardrailHelper(null, maxMemoryInJvm, brokerNumHttpConnections);
}
private SubqueryGuardrailHelper fetchSubqueryLimitUtilsForLookups(long maxMemoryInJvm, int brokerNumHttpConnections)
{
return new SubqueryGuardrailHelper(lookupManager(), maxMemoryInJvm, brokerNumHttpConnections);
}
private static long humanReadableSizeToBytes(String humanReadableSize)
{
return new HumanReadableBytes(humanReadableSize).getBytes();
}
public static LookupExtractorFactoryContainerProvider lookupManager()
{
LookupExtractorFactoryContainerProvider lookupManager = EasyMock.mock(LookupExtractorFactoryContainerProvider.class);
EasyMock.expect(lookupManager.getAllLookupNames()).andReturn(ImmutableSet.of("lookupFoo", "lookupBar")).anyTimes();
LookupExtractorFactoryContainer lookupFooContainer = EasyMock.mock(LookupExtractorFactoryContainer.class);
EasyMock.expect(lookupManager.get("lookupFoo")).andReturn(Optional.of(lookupFooContainer));
LookupExtractorFactory lookupFooExtractorFactory = EasyMock.mock(LookupExtractorFactory.class);
EasyMock.expect(lookupFooContainer.getLookupExtractorFactory()).andReturn(lookupFooExtractorFactory);
LookupExtractor lookupFooExtractor = EasyMock.mock(LookupExtractor.class);
EasyMock.expect(lookupFooExtractorFactory.get()).andReturn(lookupFooExtractor);
EasyMock.expect(lookupFooExtractor.estimateHeapFootprint()).andReturn(humanReadableSizeToBytes("10MiB"));
EasyMock.expect(lookupManager.get("lookupBar")).andReturn(Optional.empty());
EasyMock.replay(lookupManager, lookupFooContainer, lookupFooExtractor, lookupFooExtractorFactory);
return lookupManager;
}
}

View File

@ -69,6 +69,7 @@ import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
import org.apache.druid.server.metrics.SubqueryCountStatsProvider;
import org.apache.druid.server.router.TieredBrokerConfig;
import org.apache.druid.sql.guice.SqlModule;
import org.apache.druid.timeline.PruneLoadSpec;
@ -146,6 +147,7 @@ public class CliBroker extends ServerRunnable
binder.bind(BrokerQueryResource.class).in(LazySingleton.class);
Jerseys.addResource(binder, BrokerQueryResource.class);
binder.bind(QueryCountStatsProvider.class).to(BrokerQueryResource.class).in(LazySingleton.class);
binder.bind(SubqueryCountStatsProvider.class).toInstance(new SubqueryCountStatsProvider());
Jerseys.addResource(binder, BrokerResource.class);
Jerseys.addResource(binder, ClientInfoResource.class);

View File

@ -116,7 +116,8 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
),
conglomerate,
joinableFactoryWrapper.getJoinableFactory(),
new ServerConfig()
new ServerConfig(),
LOOKUP_EXTRACTOR_FACTORY_CONTAINER_PROVIDER
);
}