query laning and load shedding (#9407)

* prototype

* merge QueryScheduler and QueryManager

* everything in its right place

* adjustments

* docs

* fixes

* doc fixes

* use resilience4j instead of semaphore

* more tests

* simplify

* checkstyle

* spelling

* oops heh

* remove unused

* simplify

* concurrency tests

* add SqlResource tests, refactor error response

* add json config tests

* use LongAdder instead of AtomicLong

* remove test only stuffs from scheduler

* javadocs, etc

* style

* partial review stuffs

* adjust

* review stuffs

* more javadoc

* error response documentation

* spelling

* preserve user specified lane for NoSchedulingStrategy

* more test, why not

* doc adjustment

* style

* missed review for make a thing a constant

* fixes and tests

* fix test

* Update docs/configuration/index.md

Co-Authored-By: sthetland <steve.hetland@imply.io>

* doc update

Co-authored-by: sthetland <steve.hetland@imply.io>
This commit is contained in:
Clint Wylie 2020-03-10 02:57:16 -07:00 committed by GitHub
parent 75e2051195
commit 8b9fe6f584
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 2216 additions and 278 deletions

View File

@ -104,7 +104,10 @@ import org.apache.druid.query.topn.TopNQueryRunnerFactory;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
@ -338,7 +341,8 @@ public class CachingClusteredClientBenchmark
new CacheConfig(),
new DruidHttpClientConfig(),
processingConfig,
forkJoinPool
forkJoinPool,
new QueryScheduler(0, NoQueryLaningStrategy.INSTANCE, new ServerConfig())
);
}

View File

@ -214,6 +214,7 @@ def build_compatible_license_names():
compatible_licenses['Apache License, Version 2.0'] = 'Apache License version 2.0'
compatible_licenses['The Apache Software License, Version 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache-2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache 2'] = 'Apache License version 2.0'
compatible_licenses['Apache License 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache Software License - Version 2.0'] = 'Apache License version 2.0'

View File

@ -1481,9 +1481,35 @@ These Broker configurations can be defined in the `broker/runtime.properties` fi
|`druid.broker.select.tier`|`highestPriority`, `lowestPriority`, `custom`|If segments are cross-replicated across tiers in a cluster, you can tell the broker to prefer to select segments in a tier with a certain priority.|`highestPriority`|
|`druid.broker.select.tier.custom.priorities`|`An array of integer priorities.`|Select servers in tiers with a custom priority list.|None|
##### Query laning
*Laning strategies* allow you to control capacity utilization for heterogeneous query workloads. With laning, the broker examines and classifies a query for the purpose of assigning it to a 'lane'. Lanes have capacity limits, enforced by the broker, that can be used to ensure sufficient resources are available for other lanes or for interactive queries (with no lane), or to limit overall throughput for queries within the lane. Requests in excess of the capacity are discarded with an HTTP 429 status code.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.scheduler.numThreads`|Maximum number of HTTP threads to dedicate to query processing. To save HTTP thread capacity, this should be lower than `druid.server.http.numThreads`.|Unbounded|
|`druid.query.scheduler.laning.strategy`|Query laning strategy to use to assign queries to a lane in order to control capacities for certain classes of queries.|`none`|
##### Laning strategies
###### No laning strategy
In this mode, queries are never assigned a lane, and the concurrent query count will only be limited by `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, if set. This is the default Druid query scheduler operating mode. Enable this strategy explicitly by setting `druid.query.scheduler.laning.strategy` to `none`.
###### 'High/Low' laning strategy
This laning strategy splits queries with a `priority` below zero into a `low` query lane, automatically. Queries with priority of zero (the default) or above are considered 'interactive'. The limit on `low` queries can be set to some desired percentage of the total capacity (or HTTP thread pool size), reserving capacity for interactive queries. Queries in the `low` lane are _not_ guaranteed their capacity, which may be consumed by interactive queries, but may use up to this limit if total capacity is available.
If the `low` lane is specified in the [query context](../querying/query-context.md) `lane` parameter, this will override the computed lane.
This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=hilo`.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of`druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be in the range 1 to 100, and will be rounded up|No default, must be set if using this mode|
##### Server Configuration
Druid uses Jetty to serve HTTP requests.
Druid uses Jetty to serve HTTP requests. Each query being processed consumes a single thread from `druid.server.http.numThreads`, so consider defining `druid.query.scheduler.numThreads` to a lower value in order to reserve HTTP threads for responding to health checks, lookup loading, and other non-query, and in most cases comparatively very short lived, HTTP requests.
|Property|Description|Default|
|--------|-----------|-------|

View File

@ -29,6 +29,7 @@ The query context is used for various query configuration parameters. The follow
|-----------------|----------------------------------------|----------------------|
|timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [Broker configuration](../configuration/index.html#broker) |
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|lane | `null` | Query lane, used to control usage limits on classes of queries. See [Broker configuration](../configuration/index.html#broker) for more details.|
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Apache Druid uses `druid.broker.cache.useCache` or `druid.historical.cache.useCache` to determine whether or not to read from the query cache |
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateCache` or `druid.historical.cache.populateCache` to determine whether or not to save the results of this query to the query cache |

View File

@ -109,6 +109,8 @@ If a query fails, you will get an HTTP 500 response containing a JSON object wit
}
```
If a query request fails due to being limited by the [query scheduler laning configuration](../configuration/index.md#broker), an HTTP 429 response with the same JSON object schema as an error response, but with `errorMessage` of the form: "Total query capacity exceeded" or "Query capacity exceeded for lane 'low'".
The fields in the response are:
|field|description|

View File

@ -65,7 +65,9 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.TimelineLookup;
import org.hamcrest.core.IsInstanceOf;
@ -361,7 +363,8 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
return null;
}
},
ForkJoinPool.commonPool()
ForkJoinPool.commonPool(),
new QueryScheduler(0, NoQueryLaningStrategy.INSTANCE, new ServerConfig())
);
ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(

View File

@ -909,7 +909,7 @@ public class RealtimeIndexTaskTest
new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
public void registerQueryFuture(Query query, ListenableFuture future)
{
// do nothing
}

View File

@ -1869,6 +1869,17 @@ libraries:
---
name: Resilience4j
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 1.3.1
libraries:
- io.github.resilience4j: resilience4j-core
- io.github.resilience4j: resilience4j-bulkhead
---
name: RoaringBitmap
license_category: binary
module: java-core
@ -1880,6 +1891,17 @@ libraries:
---
name: vavr
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 0.10.2
libraries:
- io.vavr: vavr
- io.vavr: vavr-match
---
name: Config Magic
license_category: binary
module: java-core

View File

@ -94,6 +94,7 @@
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
<log4j.version>2.8.2</log4j.version>
<netty3.version>3.10.6.Final</netty3.version>
<resilience4j.version>1.3.1</resilience4j.version>
<!-- Spark updated in https://github.com/apache/spark/pull/19884 -->
<netty4.version>4.1.45.Final</netty4.version>
<node.version>v10.14.2</node.version>
@ -1181,6 +1182,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
<version>${resilience4j.version}</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>

View File

@ -144,7 +144,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
)
);
queryWatcher.registerQuery(query, futures);
queryWatcher.registerQueryFuture(query, futures);
try {
return new MergeIterable<>(

View File

@ -178,7 +178,7 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
)
{
try {
queryWatcher.registerQuery(query, future);
queryWatcher.registerQueryFuture(query, future);
if (QueryContexts.hasTimeout(query)) {
future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS);
} else {

View File

@ -21,6 +21,7 @@ package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Ordering;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.granularity.Granularity;
@ -146,4 +147,10 @@ public interface Query<T>
{
return this;
}
default Query<T> withLane(String lane)
{
return withOverriddenContext(ImmutableMap.of(QueryContexts.LANE_KEY, lane));
}
}

View File

@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
public class QueryContexts
{
public static final String PRIORITY_KEY = "priority";
public static final String LANE_KEY = "lane";
public static final String TIMEOUT_KEY = "timeout";
public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes";
public static final String MAX_QUEUED_BYTES_KEY = "maxQueuedBytes";
@ -202,6 +203,11 @@ public class QueryContexts
return parseInt(query, PRIORITY_KEY, defaultValue);
}
public static <T> String getLane(Query<T> query)
{
return (String) query.getContextValue(LANE_KEY);
}
public static <T> boolean getEnableParallelMerges(Query<T> query)
{
return parseBoolean(query, BROKER_PARALLEL_MERGE_KEY, DEFAULT_ENABLE_PARALLEL_MERGE);

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
/**
* Base serializable error response
*
* QueryResource and SqlResource are expected to emit the JSON form of this object when errors happen.
*/
public class QueryException extends RuntimeException
{
private final String errorCode;
private final String errorClass;
private final String host;
public QueryException(Throwable cause, String errorCode, String errorClass, String host)
{
super(cause == null ? null : cause.getMessage(), cause);
this.errorCode = errorCode;
this.errorClass = errorClass;
this.host = host;
}
@JsonCreator
public QueryException(
@JsonProperty("error") @Nullable String errorCode,
@JsonProperty("errorMessage") String errorMessage,
@JsonProperty("errorClass") @Nullable String errorClass,
@JsonProperty("host") @Nullable String host
)
{
super(errorMessage);
this.errorCode = errorCode;
this.errorClass = errorClass;
this.host = host;
}
@Nullable
@JsonProperty("error")
public String getErrorCode()
{
return errorCode;
}
@JsonProperty("errorMessage")
@Override
public String getMessage()
{
return super.getMessage();
}
@JsonProperty
public String getErrorClass()
{
return errorClass;
}
@JsonProperty
public String getHost()
{
return host;
}
}

View File

@ -42,7 +42,7 @@ import java.util.concurrent.TimeoutException;
* The QueryResource is expected to emit the JSON form of this object when errors happen, and the DirectDruidClient
* deserializes and wraps them.
*/
public class QueryInterruptedException extends RuntimeException
public class QueryInterruptedException extends QueryException
{
public static final String QUERY_INTERRUPTED = "Query interrupted";
public static final String QUERY_TIMEOUT = "Query timeout";
@ -52,10 +52,6 @@ public class QueryInterruptedException extends RuntimeException
public static final String UNSUPPORTED_OPERATION = "Unsupported operation";
public static final String UNKNOWN_EXCEPTION = "Unknown exception";
private final String errorCode;
private final String errorClass;
private final String host;
@JsonCreator
public QueryInterruptedException(
@JsonProperty("error") @Nullable String errorCode,
@ -64,10 +60,7 @@ public class QueryInterruptedException extends RuntimeException
@JsonProperty("host") @Nullable String host
)
{
super(errorMessage);
this.errorCode = errorCode;
this.errorClass = errorClass;
this.host = host;
super(errorCode, errorMessage, errorClass, host);
}
/**
@ -83,36 +76,7 @@ public class QueryInterruptedException extends RuntimeException
public QueryInterruptedException(Throwable cause, String host)
{
super(cause == null ? null : cause.getMessage(), cause);
this.errorCode = getErrorCodeFromThrowable(cause);
this.errorClass = getErrorClassFromThrowable(cause);
this.host = host;
}
@Nullable
@JsonProperty("error")
public String getErrorCode()
{
return errorCode;
}
@JsonProperty("errorMessage")
@Override
public String getMessage()
{
return super.getMessage();
}
@JsonProperty
public String getErrorClass()
{
return errorClass;
}
@JsonProperty
public String getHost()
{
return host;
super(cause, getErrorCodeFromThrowable(cause), getErrorClassFromThrowable(cause), host);
}
@Override
@ -121,9 +85,9 @@ public class QueryInterruptedException extends RuntimeException
return StringUtils.format(
"QueryInterruptedException{msg=%s, code=%s, class=%s, host=%s}",
getMessage(),
errorCode,
errorClass,
host
getErrorCode(),
getErrorClass(),
getHost()
);
}

View File

@ -43,5 +43,5 @@ public interface QueryWatcher
* @param query a query, which may be a subset of a larger query, as long as the underlying queryId is unchanged
* @param future the future holding the execution status of the query
*/
void registerQuery(Query query, ListenableFuture future);
void registerQueryFuture(Query<?> query, ListenableFuture<?> future);
}

View File

@ -346,7 +346,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
{
try {
if (queryWatcher != null) {
queryWatcher.registerQuery(query, future);
queryWatcher.registerQueryFuture(query, future);
}
if (hasTimeout && timeout <= 0) {

View File

@ -219,7 +219,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
}
);
try {
queryWatcher.registerQuery(query, future);
queryWatcher.registerQueryFuture(query, future);
if (QueryContexts.hasTimeout(query)) {
return future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS);
} else {

View File

@ -84,7 +84,7 @@ public class ChainedExecutionQueryRunnerTest
Capture<ListenableFuture> capturedFuture = EasyMock.newCapture();
QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
watcher.registerQuery(
watcher.registerQueryFuture(
EasyMock.anyObject(),
EasyMock.and(EasyMock.anyObject(), EasyMock.capture(capturedFuture))
);
@ -207,7 +207,7 @@ public class ChainedExecutionQueryRunnerTest
Capture<ListenableFuture> capturedFuture = Capture.newInstance();
QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
watcher.registerQuery(
watcher.registerQueryFuture(
EasyMock.anyObject(),
EasyMock.and(EasyMock.anyObject(), EasyMock.capture(capturedFuture))
);

View File

@ -676,7 +676,7 @@ public class GroupByLimitPushDownInsufficientBufferTest
public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
public void registerQueryFuture(Query query, ListenableFuture future)
{
}

View File

@ -411,7 +411,7 @@ public class GroupByMultiSegmentTest
public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
public void registerQueryFuture(Query query, ListenableFuture future)
{
}

View File

@ -311,6 +311,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -69,6 +69,7 @@ import org.apache.druid.query.filter.DimFilterUtils;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.server.QueryResource;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@ -113,6 +114,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
private final DruidHttpClientConfig httpClientConfig;
private final DruidProcessingConfig processingConfig;
private final ForkJoinPool pool;
private final QueryScheduler scheduler;
@Inject
public CachingClusteredClient(
@ -124,7 +126,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
CacheConfig cacheConfig,
@Client DruidHttpClientConfig httpClientConfig,
DruidProcessingConfig processingConfig,
@Merging ForkJoinPool pool
@Merging ForkJoinPool pool,
QueryScheduler scheduler
)
{
this.warehouse = warehouse;
@ -136,6 +139,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
this.httpClientConfig = httpClientConfig;
this.processingConfig = processingConfig;
this.pool = pool;
this.scheduler = scheduler;
if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) {
log.warn(
@ -222,9 +226,9 @@ public class CachingClusteredClient implements QuerySegmentWalker
*/
private class SpecificQueryRunnable<T>
{
private final QueryPlus<T> queryPlus;
private final ResponseContext responseContext;
private final Query<T> query;
private QueryPlus<T> queryPlus;
private Query<T> query;
private final QueryToolChest<T, Query<T>> toolChest;
@Nullable
private final CacheStrategy<T, Object, Query<T>> strategy;
@ -232,7 +236,6 @@ public class CachingClusteredClient implements QuerySegmentWalker
private final boolean populateCache;
private final boolean isBySegment;
private final int uncoveredIntervalsLimit;
private final Query<T> downstreamQuery;
private final Map<String, Cache.NamedKey> cachePopulatorKeyMap = new HashMap<>();
private final DataSourceAnalysis dataSourceAnalysis;
private final List<Interval> intervals;
@ -251,7 +254,6 @@ public class CachingClusteredClient implements QuerySegmentWalker
// Note that enabling this leads to putting uncovered intervals information in the response headers
// and might blow up in some cases https://github.com/apache/druid/issues/2108
this.uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query);
this.downstreamQuery = query.withOverriddenContext(makeDownstreamQueryContext());
this.dataSourceAnalysis = DataSourceAnalysis.forDataSource(query.getDataSource());
// For nested queries, we need to look at the intervals of the inner most query.
this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec()
@ -265,6 +267,10 @@ public class CachingClusteredClient implements QuerySegmentWalker
final int priority = QueryContexts.getPriority(query);
contextBuilder.put(QueryContexts.PRIORITY_KEY, priority);
final String lane = QueryContexts.getLane(query);
if (lane != null) {
contextBuilder.put(QueryContexts.LANE_KEY, lane);
}
if (populateCache) {
// prevent down-stream nodes from caching results as well if we are populating the cache
@ -288,27 +294,34 @@ public class CachingClusteredClient implements QuerySegmentWalker
computeUncoveredIntervals(timeline);
}
final Set<ServerToSegment> segments = computeSegmentsToQuery(timeline);
final Set<SegmentServerSelector> segmentServers = computeSegmentsToQuery(timeline);
@Nullable
final byte[] queryCacheKey = computeQueryCacheKey();
if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
@Nullable
final String prevEtag = (String) query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
@Nullable
final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
final String currentEtag = computeCurrentEtag(segmentServers, queryCacheKey);
if (currentEtag != null && currentEtag.equals(prevEtag)) {
return Sequences.empty();
}
}
final List<Pair<Interval, byte[]>> alreadyCachedResults = pruneSegmentsWithCachedResults(queryCacheKey, segments);
final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer = groupSegmentsByServer(segments);
return new LazySequence<>(() -> {
final List<Pair<Interval, byte[]>> alreadyCachedResults =
pruneSegmentsWithCachedResults(queryCacheKey, segmentServers);
query = scheduler.laneQuery(queryPlus, segmentServers);
queryPlus = queryPlus.withQuery(query);
final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer = groupSegmentsByServer(segmentServers);
LazySequence<T> mergedResultSequence = new LazySequence<>(() -> {
List<Sequence<T>> sequencesByInterval = new ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
addSequencesFromServer(sequencesByInterval, segmentsByServer);
return merge(sequencesByInterval);
});
return scheduler.run(query, mergedResultSequence);
}
private Sequence<T> merge(List<Sequence<T>> sequencesByInterval)
@ -347,14 +360,14 @@ public class CachingClusteredClient implements QuerySegmentWalker
}
}
private Set<ServerToSegment> computeSegmentsToQuery(TimelineLookup<String, ServerSelector> timeline)
private Set<SegmentServerSelector> computeSegmentsToQuery(TimelineLookup<String, ServerSelector> timeline)
{
final List<TimelineObjectHolder<String, ServerSelector>> serversLookup = toolChest.filterSegments(
query,
intervals.stream().flatMap(i -> timeline.lookup(i).stream()).collect(Collectors.toList())
);
final Set<ServerToSegment> segments = new LinkedHashSet<>();
final Set<SegmentServerSelector> segments = new LinkedHashSet<>();
final Map<String, Optional<RangeSet<String>>> dimensionRangeCache = new HashMap<>();
// Filter unneeded chunks based on partition dimension
for (TimelineObjectHolder<String, ServerSelector> holder : serversLookup) {
@ -371,7 +384,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
holder.getVersion(),
chunk.getChunkNumber()
);
segments.add(new ServerToSegment(server, segment));
segments.add(new SegmentServerSelector(server, segment));
}
}
return segments;
@ -431,11 +444,11 @@ public class CachingClusteredClient implements QuerySegmentWalker
}
@Nullable
private String computeCurrentEtag(final Set<ServerToSegment> segments, @Nullable byte[] queryCacheKey)
private String computeCurrentEtag(final Set<SegmentServerSelector> segments, @Nullable byte[] queryCacheKey)
{
Hasher hasher = Hashing.sha1().newHasher();
boolean hasOnlyHistoricalSegments = true;
for (ServerToSegment p : segments) {
for (SegmentServerSelector p : segments) {
if (!p.getServer().pick().getServer().segmentReplicatable()) {
hasOnlyHistoricalSegments = false;
break;
@ -460,14 +473,14 @@ public class CachingClusteredClient implements QuerySegmentWalker
private List<Pair<Interval, byte[]>> pruneSegmentsWithCachedResults(
final byte[] queryCacheKey,
final Set<ServerToSegment> segments
final Set<SegmentServerSelector> segments
)
{
if (queryCacheKey == null) {
return Collections.emptyList();
}
final List<Pair<Interval, byte[]>> alreadyCachedResults = new ArrayList<>();
Map<ServerToSegment, Cache.NamedKey> perSegmentCacheKeys = computePerSegmentCacheKeys(segments, queryCacheKey);
Map<SegmentServerSelector, Cache.NamedKey> perSegmentCacheKeys = computePerSegmentCacheKeys(segments, queryCacheKey);
// Pull cached segments from cache and remove from set of segments to query
final Map<Cache.NamedKey, byte[]> cachedValues = computeCachedValues(perSegmentCacheKeys);
@ -488,25 +501,25 @@ public class CachingClusteredClient implements QuerySegmentWalker
return alreadyCachedResults;
}
private Map<ServerToSegment, Cache.NamedKey> computePerSegmentCacheKeys(
Set<ServerToSegment> segments,
private Map<SegmentServerSelector, Cache.NamedKey> computePerSegmentCacheKeys(
Set<SegmentServerSelector> segments,
byte[] queryCacheKey
)
{
// cacheKeys map must preserve segment ordering, in order for shards to always be combined in the same order
Map<ServerToSegment, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap();
for (ServerToSegment serverToSegment : segments) {
Map<SegmentServerSelector, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap();
for (SegmentServerSelector segmentServer : segments) {
final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey(
serverToSegment.getServer().getSegment().getId().toString(),
serverToSegment.getSegmentDescriptor(),
segmentServer.getServer().getSegment().getId().toString(),
segmentServer.getSegmentDescriptor(),
queryCacheKey
);
cacheKeys.put(serverToSegment, segmentCacheKey);
cacheKeys.put(segmentServer, segmentCacheKey);
}
return cacheKeys;
}
private Map<Cache.NamedKey, byte[]> computeCachedValues(Map<ServerToSegment, Cache.NamedKey> cacheKeys)
private Map<Cache.NamedKey, byte[]> computeCachedValues(Map<SegmentServerSelector, Cache.NamedKey> cacheKeys)
{
if (useCache) {
return cache.getBulk(Iterables.limit(cacheKeys.values(), cacheConfig.getCacheBulkMergeLimit()));
@ -530,21 +543,21 @@ public class CachingClusteredClient implements QuerySegmentWalker
return cachePopulatorKeyMap.get(StringUtils.format("%s_%s", segmentId, segmentInterval));
}
private SortedMap<DruidServer, List<SegmentDescriptor>> groupSegmentsByServer(Set<ServerToSegment> segments)
private SortedMap<DruidServer, List<SegmentDescriptor>> groupSegmentsByServer(Set<SegmentServerSelector> segments)
{
final SortedMap<DruidServer, List<SegmentDescriptor>> serverSegments = new TreeMap<>();
for (ServerToSegment serverToSegment : segments) {
final QueryableDruidServer queryableDruidServer = serverToSegment.getServer().pick();
for (SegmentServerSelector segmentServer : segments) {
final QueryableDruidServer queryableDruidServer = segmentServer.getServer().pick();
if (queryableDruidServer == null) {
log.makeAlert(
"No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!",
serverToSegment.getSegmentDescriptor(),
segmentServer.getSegmentDescriptor(),
query.getDataSource()
).emit();
} else {
final DruidServer server = queryableDruidServer.getServer();
serverSegments.computeIfAbsent(server, s -> new ArrayList<>()).add(serverToSegment.getSegmentDescriptor());
serverSegments.computeIfAbsent(server, s -> new ArrayList<>()).add(segmentServer.getSegmentDescriptor());
}
}
return serverSegments;
@ -668,11 +681,12 @@ public class CachingClusteredClient implements QuerySegmentWalker
)
{
@SuppressWarnings("unchecked")
final Query<T> downstreamQuery = query.withOverriddenContext(makeDownstreamQueryContext());
final Sequence<Result<BySegmentResultValueClass<T>>> resultsBySegments = serverRunner.run(
queryPlus
.withQuery(
Queries.withSpecificSegments(
(Query<Result<BySegmentResultValueClass<T>>>) downstreamQuery,
downstreamQuery,
segmentsOfServer
)
)
@ -697,22 +711,4 @@ public class CachingClusteredClient implements QuerySegmentWalker
.flatMerge(seq -> seq, query.getResultOrdering());
}
}
private static class ServerToSegment extends Pair<ServerSelector, SegmentDescriptor>
{
private ServerToSegment(ServerSelector server, SegmentDescriptor segment)
{
super(server, segment);
}
ServerSelector getServer()
{
return lhs;
}
SegmentDescriptor getSegmentDescriptor()
{
return rhs;
}
}
}

View File

@ -451,7 +451,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
Duration.millis(timeLeft)
);
queryWatcher.registerQuery(query, future);
queryWatcher.registerQueryFuture(query, future);
openConnections.getAndIncrement();
Futures.addCallback(

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.SegmentDescriptor;
/**
* Given a {@link SegmentDescriptor}, get a {@link ServerSelector} to use to pick a {@link DruidServer} to query.
*
* Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data
*/
public class SegmentServerSelector extends Pair<ServerSelector, SegmentDescriptor>
{
public SegmentServerSelector(ServerSelector server, SegmentDescriptor segment)
{
super(server, segment);
}
public ServerSelector getServer()
{
return lhs;
}
public SegmentDescriptor getSegmentDescriptor()
{
return rhs;
}
}

View File

@ -21,7 +21,10 @@ package org.apache.druid.guice;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryWatcher;
@ -42,7 +45,8 @@ import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryRunnerFactory;
import org.apache.druid.server.QueryManager;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QuerySchedulerProvider;
import java.util.Map;
@ -50,8 +54,8 @@ import java.util.Map;
*/
public class QueryRunnerFactoryModule extends QueryToolChestModule
{
private static final Map<Class<? extends Query>, Class<? extends QueryRunnerFactory>> MAPPINGS =
ImmutableMap.<Class<? extends Query>, Class<? extends QueryRunnerFactory>>builder()
private static final Map<Class<? extends Query<?>>, Class<? extends QueryRunnerFactory<?, ?>>> MAPPINGS =
ImmutableMap.<Class<? extends Query<?>>, Class<? extends QueryRunnerFactory<?, ?>>>builder()
.put(TimeseriesQuery.class, TimeseriesQueryRunnerFactory.class)
.put(SearchQuery.class, SearchQueryRunnerFactory.class)
.put(TimeBoundaryQuery.class, TimeBoundaryQueryRunnerFactory.class)
@ -67,21 +71,28 @@ public class QueryRunnerFactoryModule extends QueryToolChestModule
{
super.configure(binder);
binder.bind(QueryWatcher.class)
.to(QueryManager.class)
.in(LazySingleton.class);
binder.bind(QueryManager.class)
binder.bind(QueryScheduler.class)
.toProvider(Key.get(QuerySchedulerProvider.class, Global.class))
.in(LazySingleton.class);
binder.bind(QuerySchedulerProvider.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.query.scheduler", QuerySchedulerProvider.class, Global.class);
final MapBinder<Class<? extends Query>, QueryRunnerFactory> queryFactoryBinder = DruidBinders.queryRunnerFactoryBinder(
binder
);
for (Map.Entry<Class<? extends Query>, Class<? extends QueryRunnerFactory>> entry : MAPPINGS.entrySet()) {
for (Map.Entry<Class<? extends Query<?>>, Class<? extends QueryRunnerFactory<?, ?>>> entry : MAPPINGS.entrySet()) {
queryFactoryBinder.addBinding(entry.getKey()).to(entry.getValue());
binder.bind(entry.getValue()).in(LazySingleton.class);
}
binder.bind(GroupByQueryEngine.class).in(LazySingleton.class);
}
@LazySingleton
@Provides
public QueryWatcher getWatcher(QueryScheduler scheduler)
{
return scheduler;
}
}

View File

@ -58,7 +58,7 @@ public class BrokerQueryResource extends QueryResource
QueryLifecycleFactory queryLifecycleFactory,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryManager queryManager,
QueryScheduler queryScheduler,
AuthConfig authConfig,
AuthorizerMapper authorizerMapper,
GenericQueryMetricsFactory queryMetricsFactory,
@ -69,7 +69,7 @@ public class BrokerQueryResource extends QueryResource
queryLifecycleFactory,
jsonMapper,
smileMapper,
queryManager,
queryScheduler,
authConfig,
authorizerMapper,
queryMetricsFactory

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.QueryException;
/**
* This exception is for {@link QueryResource} and SqlResource to surface when a query is cast away by
* {@link QueryScheduler}.
*
* As a {@link QueryException} it is expected to be serialied to a json response, but will be mapped to
* {@link #STATUS_CODE} instead of the default HTTP 500 status.
*/
public class QueryCapacityExceededException extends QueryException
{
private static final String ERROR_CLASS = QueryCapacityExceededException.class.getName();
public static final String ERROR_CODE = "Query capacity exceeded";
public static final String ERROR_MESSAGE = "Total query capacity exceeded";
public static final String ERROR_MESSAGE_TEMPLATE = "Query capacity exceeded for lane '%s'";
public static final int STATUS_CODE = 429;
public QueryCapacityExceededException()
{
super(ERROR_CODE, ERROR_MESSAGE, ERROR_CLASS, null);
}
public QueryCapacityExceededException(String lane)
{
super(ERROR_CODE, StringUtils.format(ERROR_MESSAGE_TEMPLATE, lane), ERROR_CLASS, null);
}
@JsonCreator
public QueryCapacityExceededException(
@JsonProperty("error") String errorCode,
@JsonProperty("errorMessage") String errorMessage,
@JsonProperty("errorClass") String errorClass)
{
super(errorCode, errorMessage, errorClass, null);
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import java.util.Optional;
import java.util.Set;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = NoQueryLaningStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "none", value = NoQueryLaningStrategy.class),
@JsonSubTypes.Type(name = "hilo", value = HiLoQueryLaningStrategy.class)
})
public interface QueryLaningStrategy
{
/**
* Provide a map of lane names to the limit on the number of concurrent queries for that lane
* @param totalLimit
*/
Object2IntMap<String> getLaneLimits(int totalLimit);
/**
* For a given {@link QueryPlus} and set of {@link SegmentServerSelector}, compute if a query belongs to a lane
*
* This method must be thread safe
*/
<T> Optional<String> computeLane(QueryPlus<T> query, Set<SegmentServerSelector> segments);
}

View File

@ -1,86 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryWatcher;
import java.util.Set;
public class QueryManager implements QueryWatcher
{
private final SetMultimap<String, ListenableFuture> queries;
private final SetMultimap<String, String> queryDatasources;
public QueryManager()
{
this.queries = Multimaps.synchronizedSetMultimap(
HashMultimap.create()
);
this.queryDatasources = Multimaps.synchronizedSetMultimap(
HashMultimap.create()
);
}
public boolean cancelQuery(String id)
{
queryDatasources.removeAll(id);
Set<ListenableFuture> futures = queries.removeAll(id);
boolean success = true;
for (ListenableFuture future : futures) {
success = success && future.cancel(true);
}
return success;
}
@Override
public void registerQuery(Query query, final ListenableFuture future)
{
final String id = query.getId();
final Set<String> datasources = query.getDataSource().getTableNames();
queries.put(id, future);
queryDatasources.putAll(id, datasources);
future.addListener(
new Runnable()
{
@Override
public void run()
{
queries.remove(id, future);
for (String datasource : datasources) {
queryDatasources.remove(id, datasource);
}
}
},
Execs.directExecutor()
);
}
public Set<String> getQueryDatasources(final String queryId)
{
return queryDatasources.get(queryId);
}
}

View File

@ -99,7 +99,7 @@ public class QueryResource implements QueryCountStatsProvider
protected final ObjectMapper smileMapper;
protected final ObjectMapper serializeDateTimeAsLongJsonMapper;
protected final ObjectMapper serializeDateTimeAsLongSmileMapper;
protected final QueryManager queryManager;
protected final QueryScheduler queryScheduler;
protected final AuthConfig authConfig;
protected final AuthorizerMapper authorizerMapper;
@ -113,7 +113,7 @@ public class QueryResource implements QueryCountStatsProvider
QueryLifecycleFactory queryLifecycleFactory,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryManager queryManager,
QueryScheduler queryScheduler,
AuthConfig authConfig,
AuthorizerMapper authorizerMapper,
GenericQueryMetricsFactory queryMetricsFactory
@ -124,7 +124,7 @@ public class QueryResource implements QueryCountStatsProvider
this.smileMapper = smileMapper;
this.serializeDateTimeAsLongJsonMapper = serializeDataTimeAsLong(jsonMapper);
this.serializeDateTimeAsLongSmileMapper = serializeDataTimeAsLong(smileMapper);
this.queryManager = queryManager;
this.queryScheduler = queryScheduler;
this.authConfig = authConfig;
this.authorizerMapper = authorizerMapper;
this.queryMetricsFactory = queryMetricsFactory;
@ -138,9 +138,9 @@ public class QueryResource implements QueryCountStatsProvider
if (log.isDebugEnabled()) {
log.debug("Received cancel request for query [%s]", queryId);
}
Set<String> datasources = queryManager.getQueryDatasources(queryId);
Set<String> datasources = queryScheduler.getQueryDatasources(queryId);
if (datasources == null) {
log.warn("QueryId [%s] not registered with QueryManager, cannot cancel", queryId);
log.warn("QueryId [%s] not registered with QueryScheduler, cannot cancel", queryId);
datasources = new TreeSet<>();
}
@ -154,7 +154,7 @@ public class QueryResource implements QueryCountStatsProvider
throw new ForbiddenException(authResult.toString());
}
queryManager.cancelQuery(queryId);
queryScheduler.cancelQuery(queryId);
return Response.status(Response.Status.ACCEPTED).build();
}
@ -310,6 +310,11 @@ public class QueryResource implements QueryCountStatsProvider
queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1);
return ioReaderWriter.gotError(e);
}
catch (QueryCapacityExceededException cap) {
failedQueryCount.incrementAndGet();
queryLifecycle.emitLogsAndMetrics(cap, req.getRemoteAddr(), -1);
return ioReaderWriter.gotLimited(cap);
}
catch (ForbiddenException e) {
// don't do anything for an authorization failure, ForbiddenExceptionMapper will catch this later and
// send an error response if this is thrown.
@ -434,6 +439,13 @@ public class QueryResource implements QueryCountStatsProvider
)
.build();
}
Response gotLimited(QueryCapacityExceededException e) throws IOException
{
return Response.status(QueryCapacityExceededException.STATUS_CODE)
.entity(newOutputWriter(null, null, false).writeValueAsBytes(e))
.build();
}
}
@Override

View File

@ -0,0 +1,258 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ListenableFuture;
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.server.initialization.ServerConfig;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* QueryScheduler (potentially) assigns any {@link Query} that is to be executed to a 'query lane' using the
* {@link QueryLaningStrategy} that is defined in {@link QuerySchedulerConfig}.
*
* As a {@link QueryWatcher}, it also provides cancellation facilities to brokers, historicals, and realtime tasks.
*
* This class is shared by all requests on the HTTP theadpool and must be thread safe.
*/
public class QueryScheduler implements QueryWatcher
{
private static final int NO_CAPACITY = -1;
static final String TOTAL = "default";
private final int totalCapacity;
private final QueryLaningStrategy laningStrategy;
private final BulkheadRegistry laneRegistry;
/**
* mapping of query id to set of futures associated with the query
*/
private final SetMultimap<String, ListenableFuture<?>> queryFutures;
/**
* mapping of query id to set of datasource names that are being queried, used for authorization
*/
private final SetMultimap<String, String> queryDatasources;
public QueryScheduler(int totalNumThreads, QueryLaningStrategy laningStrategy, ServerConfig serverConfig)
{
this.laningStrategy = laningStrategy;
this.queryFutures = Multimaps.synchronizedSetMultimap(HashMultimap.create());
this.queryDatasources = Multimaps.synchronizedSetMultimap(HashMultimap.create());
// if totalNumThreads is above 0 and less than druid.server.http.numThreads, enforce total limit
final boolean limitTotal;
if (totalNumThreads > 0 && totalNumThreads < serverConfig.getNumThreads()) {
limitTotal = true;
this.totalCapacity = totalNumThreads;
} else {
limitTotal = false;
this.totalCapacity = serverConfig.getNumThreads();
}
this.laneRegistry = BulkheadRegistry.of(getLaneConfigs(limitTotal));
}
@Override
public void registerQueryFuture(Query<?> query, ListenableFuture<?> future)
{
final String id = query.getId();
final Set<String> datasources = query.getDataSource().getTableNames();
queryFutures.put(id, future);
queryDatasources.putAll(id, datasources);
future.addListener(
() -> {
queryFutures.remove(id, future);
for (String datasource : datasources) {
queryDatasources.remove(id, datasource);
}
},
Execs.directExecutor()
);
}
/**
* Assign a query a lane (if not set)
*/
public <T> Query<T> laneQuery(QueryPlus<T> queryPlus, Set<SegmentServerSelector> segments)
{
Query<T> query = queryPlus.getQuery();
Optional<String> lane = laningStrategy.computeLane(queryPlus, segments);
return lane.map(query::withLane).orElse(query);
}
/**
* Run a query with the scheduler, attempting to acquire a semaphore from the total and lane specific query capacities
*
* Note that {@link #cancelQuery} should not interrupt the thread that calls run, in all current usages it only
* cancels any {@link ListenableFuture} created downstream. If this ever commonly changes, we should add
* synchronization between {@link #cancelQuery} and the acquisition of the {@link Bulkhead} to continue to ensure that
* anything acquired is also released.
*
* In the meantime, if a {@link ListenableFuture} is registered for the query that calls this method, it MUST handle
* this synchronization itself to ensure that no {@link Bulkhead} is acquired without releasing it.
*/
public <T> Sequence<T> run(Query<?> query, Sequence<T> resultSequence)
{
List<Bulkhead> bulkheads = acquireLanes(query);
return resultSequence.withBaggage(() -> finishLanes(bulkheads));
}
/**
* Forcibly cancel all futures that have been registered to a specific query id
*/
public boolean cancelQuery(String id)
{
// if multiple independent queries from the same or different users share a query id, all will be cancelled due
// to the collision
queryDatasources.removeAll(id);
Set<ListenableFuture<?>> futures = queryFutures.removeAll(id);
boolean success = true;
for (ListenableFuture<?> future : futures) {
success = success && future.cancel(true);
}
return success;
}
/**
* Get a {@link Set} of datasource names for a {@link Query} id, used by {@link QueryResource#cancelQuery} to
* authorize that a user may call {@link #cancelQuery} for the given id and datasources
*/
public Set<String> getQueryDatasources(final String queryId)
{
return queryDatasources.get(queryId);
}
/**
* Get the maximum number of concurrent queries that {@link #run} can support
*/
@VisibleForTesting
int getTotalAvailableCapacity()
{
return laneRegistry.getConfiguration(TOTAL)
.map(config -> laneRegistry.bulkhead(TOTAL, config).getMetrics().getAvailableConcurrentCalls())
.orElse(NO_CAPACITY);
}
/**
* Get the maximum number of concurrent queries that {@link #run} can support for a given lane
*/
@VisibleForTesting
int getLaneAvailableCapacity(String lane)
{
return laneRegistry.getConfiguration(lane)
.map(config -> laneRegistry.bulkhead(lane, config).getMetrics().getAvailableConcurrentCalls())
.orElse(NO_CAPACITY);
}
/**
* Acquire a semaphore for both the 'total' and a lane, if any is associated with a query
*/
@VisibleForTesting
List<Bulkhead> acquireLanes(Query<?> query)
{
final String lane = QueryContexts.getLane(query);
final Optional<BulkheadConfig> laneConfig = lane == null ? Optional.empty() : laneRegistry.getConfiguration(lane);
final Optional<BulkheadConfig> totalConfig = laneRegistry.getConfiguration(TOTAL);
List<Bulkhead> hallPasses = new ArrayList<>(2);
try {
// if we have a lane, get it first
laneConfig.ifPresent(config -> {
Bulkhead laneLimiter = laneRegistry.bulkhead(lane, config);
if (!laneLimiter.tryAcquirePermission()) {
throw new QueryCapacityExceededException(lane);
}
hallPasses.add(laneLimiter);
});
// everyone needs to take one from the total lane; to ensure we don't acquire a lane and never release it, we want
// to check for total capacity exceeded and release the lane (if present) before throwing capacity exceeded
// note that this isn't strictly fair: the bulkhead doesn't use a fair semaphore, the first to acquire the lane
// might lose to one that came after it when acquiring the total, or an unlaned query might lose to a laned query
totalConfig.ifPresent(config -> {
Bulkhead totalLimiter = laneRegistry.bulkhead(TOTAL, config);
if (!totalLimiter.tryAcquirePermission()) {
throw new QueryCapacityExceededException();
}
hallPasses.add(totalLimiter);
});
return hallPasses;
}
catch (Exception ex) {
releaseLanes(hallPasses);
throw ex;
}
}
/**
* Release all {@link Bulkhead} semaphores in the list
*/
@VisibleForTesting
void releaseLanes(List<Bulkhead> bulkheads)
{
bulkheads.forEach(Bulkhead::releasePermission);
}
@VisibleForTesting
void finishLanes(List<Bulkhead> bulkheads)
{
bulkheads.forEach(Bulkhead::onComplete);
}
/**
* With a total thread count and {@link QueryLaningStrategy#getLaneLimits}, create a map of lane name to
* {@link BulkheadConfig} to be used to create the {@link #laneRegistry}. This accepts the configured value of
* numThreads rather than using {@link #totalCapacity} so that we only have a total {@link Bulkhead} if
* {@link QuerySchedulerConfig#getNumThreads()} is set
*/
private Map<String, BulkheadConfig> getLaneConfigs(boolean hasTotalLimit)
{
Map<String, BulkheadConfig> configs = new HashMap<>();
if (hasTotalLimit) {
configs.put(
TOTAL,
BulkheadConfig.custom().maxConcurrentCalls(totalCapacity).maxWaitDuration(Duration.ZERO).build()
);
}
for (Object2IntMap.Entry<String> entry : laningStrategy.getLaneLimits(totalCapacity).object2IntEntrySet()) {
configs.put(
entry.getKey(),
BulkheadConfig.custom().maxConcurrentCalls(entry.getIntValue()).maxWaitDuration(Duration.ZERO).build()
);
}
return configs;
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
public class QuerySchedulerConfig
{
@JsonProperty
private Integer numThreads = 0;
@JsonProperty("laning")
private QueryLaningStrategy laningStrategy = NoQueryLaningStrategy.INSTANCE;
public int getNumThreads()
{
return numThreads;
}
public QueryLaningStrategy getLaningStrategy()
{
return laningStrategy;
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.inject.Inject;
import com.google.inject.Provider;
import org.apache.druid.server.initialization.ServerConfig;
public class QuerySchedulerProvider extends QuerySchedulerConfig implements Provider<QueryScheduler>
{
private final ServerConfig serverConfig;
/**
* This needs to be both marked as guice injected to be bound correctly, and also marked with json creator and
* jackson inject to work with {@link org.apache.druid.guice.JsonConfigProvider}
*/
@Inject
@JsonCreator
public QuerySchedulerProvider(@JacksonInject ServerConfig serverConfig)
{
this.serverConfig = serverConfig;
}
@Override
public QueryScheduler get()
{
return new QueryScheduler(getNumThreads(), getLaningStrategy(), serverConfig);
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.scheduling;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.Object2IntArrayMap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.server.QueryLaningStrategy;
import java.util.Optional;
import java.util.Set;
/**
* Query laning strategy which associates all {@link Query} with priority lower than 0 into a 'low' lane
*/
public class HiLoQueryLaningStrategy implements QueryLaningStrategy
{
public static final String LOW = "low";
@JsonProperty
private final int maxLowPercent;
@JsonCreator
public HiLoQueryLaningStrategy(
@JsonProperty("maxLowPercent") Integer maxLowPercent
)
{
this.maxLowPercent = Preconditions.checkNotNull(maxLowPercent, "maxLowPercent must be set");
Preconditions.checkArgument(
0 < maxLowPercent && maxLowPercent <= 100,
"maxLowPercent must be in the range 1 to 100"
);
}
@Override
public Object2IntMap<String> getLaneLimits(int totalLimit)
{
Object2IntMap<String> onlyLow = new Object2IntArrayMap<>(1);
onlyLow.put(LOW, Ints.checkedCast((long) Math.ceil(totalLimit * ((double) maxLowPercent / 100))));
return onlyLow;
}
@Override
public <T> Optional<String> computeLane(QueryPlus<T> query, Set<SegmentServerSelector> segments)
{
final Query<T> theQuery = query.getQuery();
// QueryContexts.getPriority gives a default, since we are setting priority
final Integer priority = theQuery.getContextValue(QueryContexts.PRIORITY_KEY);
final String lane = theQuery.getContextValue(QueryContexts.LANE_KEY);
if (lane == null && priority != null && priority < 0) {
return Optional.of(LOW);
}
return Optional.ofNullable(lane);
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.scheduling;
import it.unimi.dsi.fastutil.objects.Object2IntArrayMap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.server.QueryLaningStrategy;
import java.util.Optional;
import java.util.Set;
/**
* Query laning strategy that does nothing and provides the default, unlimited behavior
*/
public class NoQueryLaningStrategy implements QueryLaningStrategy
{
private static final Object2IntMap<String> NONE = new Object2IntArrayMap<>();
public static final NoQueryLaningStrategy INSTANCE = new NoQueryLaningStrategy();
@Override
public Object2IntMap<String> getLaneLimits(int totalLimit)
{
return NONE;
}
@Override
public <T> Optional<String> computeLane(QueryPlus<T> query, Set<SegmentServerSelector> segments)
{
return Optional.ofNullable(QueryContexts.getLane(query.getQuery()));
}
}

View File

@ -47,7 +47,10 @@ import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -330,7 +333,8 @@ public class CachingClusteredClientFunctionalityTest
return 4;
}
},
ForkJoinPool.commonPool()
ForkJoinPool.commonPool(),
new QueryScheduler(0, NoQueryLaningStrategy.INSTANCE, new ServerConfig())
);
}

View File

@ -116,7 +116,10 @@ import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -2473,7 +2476,8 @@ public class CachingClusteredClientTest
return 4;
}
},
ForkJoinPool.commonPool()
ForkJoinPool.commonPool(),
new QueryScheduler(0, NoQueryLaningStrategy.INSTANCE, new ServerConfig())
);
}

View File

@ -22,28 +22,32 @@ package org.apache.druid.server;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.MapQueryToolChestWarehouse;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.timeboundary.TimeBoundaryResultValue;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.log.TestRequestLogger;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
@ -69,33 +73,31 @@ import javax.ws.rs.core.StreamingOutput;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
/**
*
*/
public class QueryResourceTest
{
private static final QueryToolChestWarehouse WAREHOUSE = new MapQueryToolChestWarehouse(ImmutableMap.of());
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final AuthenticationResult AUTHENTICATION_RESULT = new AuthenticationResult("druid", "druid", null, null);
private final HttpServletRequest testServletRequest = EasyMock.createMock(HttpServletRequest.class);
public static final QuerySegmentWalker TEST_SEGMENT_WALKER = new QuerySegmentWalker()
private static final QuerySegmentWalker TEST_SEGMENT_WALKER = new QuerySegmentWalker()
{
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> query, ResponseContext responseContext)
{
return Sequences.empty();
}
};
return (queryPlus, responseContext) -> Sequences.empty();
}
@Override
@ -105,11 +107,43 @@ public class QueryResourceTest
}
};
private static final String SIMPLE_TIMESERIES_QUERY =
"{\n"
+ " \"queryType\": \"timeseries\",\n"
+ " \"dataSource\": \"mmx_metrics\",\n"
+ " \"granularity\": \"hour\",\n"
+ " \"intervals\": [\n"
+ " \"2014-12-17/2015-12-30\"\n"
+ " ],\n"
+ " \"aggregations\": [\n"
+ " {\n"
+ " \"type\": \"count\",\n"
+ " \"name\": \"rows\"\n"
+ " }\n"
+ " ]\n"
+ "}";
private static final String SIMPLE_TIMESERIES_QUERY_LOW_PRIORITY =
"{\n"
+ " \"queryType\": \"timeseries\",\n"
+ " \"dataSource\": \"mmx_metrics\",\n"
+ " \"granularity\": \"hour\",\n"
+ " \"intervals\": [\n"
+ " \"2014-12-17/2015-12-30\"\n"
+ " ],\n"
+ " \"aggregations\": [\n"
+ " {\n"
+ " \"type\": \"count\",\n"
+ " \"name\": \"rows\"\n"
+ " }\n"
+ " ],\n"
+ " \"context\": { \"priority\": -1 }"
+ "}";
private static final ServiceEmitter NOOP_SERVICE_EMITTER = new NoopServiceEmitter();
private QueryResource queryResource;
private QueryManager queryManager;
private QueryScheduler queryScheduler;
private TestRequestLogger testRequestLogger;
@BeforeClass
@ -125,7 +159,7 @@ public class QueryResourceTest
EasyMock.expect(testServletRequest.getHeader("Accept")).andReturn(MediaType.APPLICATION_JSON).anyTimes();
EasyMock.expect(testServletRequest.getHeader(QueryResource.HEADER_IF_NONE_MATCH)).andReturn(null).anyTimes();
EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes();
queryManager = new QueryManager();
queryScheduler = new QueryScheduler(8, NoQueryLaningStrategy.INSTANCE, new ServerConfig());
testRequestLogger = new TestRequestLogger();
queryResource = new QueryResource(
new QueryLifecycleFactory(
@ -139,44 +173,25 @@ public class QueryResourceTest
),
JSON_MAPPER,
JSON_MAPPER,
queryManager,
queryScheduler,
new AuthConfig(),
null,
new DefaultGenericQueryMetricsFactory()
);
}
private static final String SIMPLE_TIMESERIES_QUERY = "{\n"
+ " \"queryType\": \"timeseries\",\n"
+ " \"dataSource\": \"mmx_metrics\",\n"
+ " \"granularity\": \"hour\",\n"
+ " \"intervals\": [\n"
+ " \"2014-12-17/2015-12-30\"\n"
+ " ],\n"
+ " \"aggregations\": [\n"
+ " {\n"
+ " \"type\": \"count\",\n"
+ " \"name\": \"rows\"\n"
+ " }\n"
+ " ]\n"
+ "}";
@After
public void tearDown()
{
EasyMock.verify(testServletRequest);
}
@Test
public void testGoodQuery() throws IOException
{
EasyMock.expect(testServletRequest.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED))
.andReturn(null)
.anyTimes();
EasyMock.expect(testServletRequest.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
expectPermissiveHappyPathAuth();
EasyMock.expect(testServletRequest.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
.andReturn(AUTHENTICATION_RESULT)
.anyTimes();
testServletRequest.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(testServletRequest);
Response response = queryResource.doPost(
new ByteArrayInputStream(SIMPLE_TIMESERIES_QUERY.getBytes("UTF-8")),
null /*pretty*/,
@ -361,7 +376,7 @@ public class QueryResourceTest
),
JSON_MAPPER,
JSON_MAPPER,
queryManager,
queryScheduler,
new AuthConfig(),
authMapper,
new DefaultGenericQueryMetricsFactory()
@ -475,7 +490,7 @@ public class QueryResourceTest
),
JSON_MAPPER,
JSON_MAPPER,
queryManager,
queryScheduler,
new AuthConfig(),
authMapper,
new DefaultGenericQueryMetricsFactory()
@ -511,7 +526,7 @@ public class QueryResourceTest
}
);
queryManager.registerQuery(query, future);
queryScheduler.registerQueryFuture(query, future);
startAwaitLatch.await();
Executors.newSingleThreadExecutor().submit(
@ -597,7 +612,7 @@ public class QueryResourceTest
),
JSON_MAPPER,
JSON_MAPPER,
queryManager,
queryScheduler,
new AuthConfig(),
authMapper,
new DefaultGenericQueryMetricsFactory()
@ -633,7 +648,7 @@ public class QueryResourceTest
}
);
queryManager.registerQuery(query, future);
queryScheduler.registerQueryFuture(query, future);
startAwaitLatch.await();
Executors.newSingleThreadExecutor().submit(
@ -655,9 +670,186 @@ public class QueryResourceTest
waitFinishLatch.await();
}
@After
public void tearDown()
@Test(timeout = 10_000L)
public void testTooManyQuery() throws InterruptedException
{
EasyMock.verify(testServletRequest);
expectPermissiveHappyPathAuth();
final CountDownLatch waitTwoScheduled = new CountDownLatch(2);
final CountDownLatch waitAllFinished = new CountDownLatch(3);
final QueryScheduler laningScheduler = new QueryScheduler(2, NoQueryLaningStrategy.INSTANCE, new ServerConfig());
createScheduledQueryResource(laningScheduler, Collections.emptyList(), ImmutableList.of(waitTwoScheduled));
assertResponseAndCountdownOrBlockForever(
SIMPLE_TIMESERIES_QUERY,
waitAllFinished,
response -> Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus())
);
assertResponseAndCountdownOrBlockForever(
SIMPLE_TIMESERIES_QUERY,
waitAllFinished,
response -> Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus())
);
waitTwoScheduled.await();
assertResponseAndCountdownOrBlockForever(
SIMPLE_TIMESERIES_QUERY,
waitAllFinished,
response -> {
Assert.assertEquals(QueryCapacityExceededException.STATUS_CODE, response.getStatus());
QueryCapacityExceededException ex;
try {
ex = JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryCapacityExceededException.class);
}
catch (IOException e) {
throw new RuntimeException(e);
}
Assert.assertEquals(QueryCapacityExceededException.ERROR_MESSAGE, ex.getMessage());
Assert.assertEquals(QueryCapacityExceededException.ERROR_CODE, ex.getErrorCode());
}
);
waitAllFinished.await();
}
@Test(timeout = 10_000L)
public void testTooManyQueryInLane() throws InterruptedException
{
expectPermissiveHappyPathAuth();
final CountDownLatch waitTwoStarted = new CountDownLatch(2);
final CountDownLatch waitOneScheduled = new CountDownLatch(1);
final CountDownLatch waitAllFinished = new CountDownLatch(3);
final QueryScheduler scheduler = new QueryScheduler(40, new HiLoQueryLaningStrategy(1), new ServerConfig());
createScheduledQueryResource(scheduler, ImmutableList.of(waitTwoStarted), ImmutableList.of(waitOneScheduled));
assertResponseAndCountdownOrBlockForever(
SIMPLE_TIMESERIES_QUERY_LOW_PRIORITY,
waitAllFinished,
response -> Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus())
);
waitOneScheduled.await();
assertResponseAndCountdownOrBlockForever(
SIMPLE_TIMESERIES_QUERY_LOW_PRIORITY,
waitAllFinished,
response -> {
Assert.assertEquals(QueryCapacityExceededException.STATUS_CODE, response.getStatus());
QueryCapacityExceededException ex;
try {
ex = JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryCapacityExceededException.class);
}
catch (IOException e) {
throw new RuntimeException(e);
}
Assert.assertEquals(
StringUtils.format(
QueryCapacityExceededException.ERROR_MESSAGE_TEMPLATE,
HiLoQueryLaningStrategy.LOW
),
ex.getMessage()
);
Assert.assertEquals(QueryCapacityExceededException.ERROR_CODE, ex.getErrorCode());
}
);
waitTwoStarted.await();
assertResponseAndCountdownOrBlockForever(
SIMPLE_TIMESERIES_QUERY,
waitAllFinished,
response -> Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus())
);
waitAllFinished.await();
}
private void createScheduledQueryResource(
QueryScheduler scheduler,
Collection<CountDownLatch> beforeScheduler,
Collection<CountDownLatch> inScheduler
)
{
QuerySegmentWalker texasRanger = new QuerySegmentWalker()
{
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
return (queryPlus, responseContext) -> {
beforeScheduler.forEach(CountDownLatch::countDown);
return scheduler.run(
scheduler.laneQuery(queryPlus, ImmutableSet.of()),
new LazySequence<T>(() -> {
inScheduler.forEach(CountDownLatch::countDown);
try {
// pretend to be a query that is waiting on results
Thread.sleep(500);
}
catch (InterruptedException ignored) {
}
// all that waiting for nothing :(
return Sequences.empty();
})
);
};
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
return getQueryRunnerForIntervals(null, null);
}
};
queryResource = new QueryResource(
new QueryLifecycleFactory(
WAREHOUSE,
texasRanger,
new DefaultGenericQueryMetricsFactory(),
new NoopServiceEmitter(),
testRequestLogger,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER
),
JSON_MAPPER,
JSON_MAPPER,
scheduler,
new AuthConfig(),
null,
new DefaultGenericQueryMetricsFactory()
);
}
private void assertResponseAndCountdownOrBlockForever(String query, CountDownLatch done, Consumer<Response> asserts)
{
Executors.newSingleThreadExecutor().submit(() -> {
try {
Response response = queryResource.doPost(
new ByteArrayInputStream(query.getBytes("UTF-8")),
null,
testServletRequest
);
asserts.accept(response);
}
catch (IOException e) {
throw new RuntimeException(e);
}
done.countDown();
});
}
private void expectPermissiveHappyPathAuth()
{
EasyMock.expect(testServletRequest.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED))
.andReturn(null)
.anyTimes();
EasyMock.expect(testServletRequest.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
EasyMock.expect(testServletRequest.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
.andReturn(AUTHENTICATION_RESULT)
.anyTimes();
testServletRequest.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(testServletRequest);
}
}

View File

@ -0,0 +1,610 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.ProvisionException;
import io.github.resilience4j.bulkhead.Bulkhead;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
public class QuerySchedulerTest
{
private static final int NUM_CONCURRENT_QUERIES = 10000;
private static final int NUM_ROWS = 10000;
@Rule
public ExpectedException expected = ExpectedException.none();
private ListeningExecutorService executorService;
private QueryScheduler scheduler;
private AtomicLong totalAcquired;
private AtomicLong totalReleased;
private AtomicLong laneAcquired;
private AtomicLong laneNotAcquired;
private AtomicLong laneReleased;
@Before
public void setup()
{
executorService = MoreExecutors.listeningDecorator(
Execs.multiThreaded(8, "test_query_scheduler_%s")
);
totalAcquired = new AtomicLong();
totalReleased = new AtomicLong();
laneAcquired = new AtomicLong();
laneNotAcquired = new AtomicLong();
laneReleased = new AtomicLong();
scheduler = new QueryScheduler(5, new HiLoQueryLaningStrategy(40), new ServerConfig()) {
@Override
List<Bulkhead> acquireLanes(Query<?> query)
{
List<Bulkhead> bulkheads = super.acquireLanes(query);
if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
totalAcquired.incrementAndGet();
}
if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
laneAcquired.incrementAndGet();
}
return bulkheads;
}
@Override
void releaseLanes(List<Bulkhead> bulkheads)
{
super.releaseLanes(bulkheads);
if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
totalReleased.incrementAndGet();
}
if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
laneReleased.incrementAndGet();
if (bulkheads.size() == 1) {
laneNotAcquired.incrementAndGet();
}
}
}
@Override
void finishLanes(List<Bulkhead> bulkheads)
{
super.finishLanes(bulkheads);
if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
totalReleased.incrementAndGet();
}
if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
laneReleased.incrementAndGet();
}
}
};
}
@After
public void teardown()
{
executorService.shutdownNow();
}
@Test
public void testHiLoHi() throws ExecutionException, InterruptedException
{
TopNQuery interactive = makeInteractiveQuery();
ListenableFuture<?> future = executorService.submit(() -> {
try {
Query<?> scheduled = scheduler.laneQuery(QueryPlus.wrap(interactive), ImmutableSet.of());
Assert.assertNotNull(scheduled);
Sequence<Integer> underlyingSequence = makeSequence(10);
underlyingSequence = Sequences.wrap(underlyingSequence, new SequenceWrapper()
{
@Override
public void before()
{
Assert.assertEquals(4, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
}
});
Sequence<Integer> results = scheduler.run(scheduled, underlyingSequence);
int rowCount = consumeAndCloseSequence(results);
Assert.assertEquals(10, rowCount);
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
});
future.get();
Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
}
@Test
public void testHiLoLo() throws ExecutionException, InterruptedException
{
TopNQuery report = makeReportQuery();
ListenableFuture<?> future = executorService.submit(() -> {
try {
Query<?> scheduledReport = scheduler.laneQuery(QueryPlus.wrap(report), ImmutableSet.of());
Assert.assertNotNull(scheduledReport);
Assert.assertEquals(HiLoQueryLaningStrategy.LOW, QueryContexts.getLane(scheduledReport));
Sequence<Integer> underlyingSequence = makeSequence(10);
underlyingSequence = Sequences.wrap(underlyingSequence, new SequenceWrapper()
{
@Override
public void before()
{
Assert.assertEquals(4, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(1, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
}
});
Sequence<Integer> results = scheduler.run(scheduledReport, underlyingSequence);
int rowCount = consumeAndCloseSequence(results);
Assert.assertEquals(10, rowCount);
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
});
future.get();
Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
}
@Test
public void testHiLoReleaseLaneWhenSequenceExplodes() throws Exception
{
expected.expectMessage("exploded");
expected.expect(ExecutionException.class);
TopNQuery interactive = makeInteractiveQuery();
ListenableFuture<?> future = executorService.submit(() -> {
try {
Query<?> scheduled = scheduler.laneQuery(QueryPlus.wrap(interactive), ImmutableSet.of());
Assert.assertNotNull(scheduled);
Sequence<Integer> underlyingSequence = makeExplodingSequence(10);
underlyingSequence = Sequences.wrap(underlyingSequence, new SequenceWrapper()
{
@Override
public void before()
{
Assert.assertEquals(4, scheduler.getTotalAvailableCapacity());
}
});
Sequence<Integer> results = scheduler.run(scheduled, underlyingSequence);
consumeAndCloseSequence(results);
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
});
future.get();
}
@Test
public void testHiLoFailsWhenOutOfLaneCapacity()
{
expected.expectMessage(
StringUtils.format(QueryCapacityExceededException.ERROR_MESSAGE_TEMPLATE, HiLoQueryLaningStrategy.LOW)
);
expected.expect(QueryCapacityExceededException.class);
Query<?> report1 = scheduler.laneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of());
scheduler.run(report1, Sequences.empty());
Assert.assertNotNull(report1);
Assert.assertEquals(4, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(1, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Query<?> report2 = scheduler.laneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of());
scheduler.run(report2, Sequences.empty());
Assert.assertNotNull(report2);
Assert.assertEquals(3, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(0, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
// too many reports
scheduler.run(scheduler.laneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()), Sequences.empty());
}
@Test
public void testHiLoFailsWhenOutOfTotalCapacity()
{
expected.expectMessage(QueryCapacityExceededException.ERROR_MESSAGE);
expected.expect(QueryCapacityExceededException.class);
Query<?> interactive1 = scheduler.laneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of());
scheduler.run(interactive1, Sequences.empty());
Assert.assertNotNull(interactive1);
Assert.assertEquals(4, scheduler.getTotalAvailableCapacity());
Query<?> report1 = scheduler.laneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of());
scheduler.run(report1, Sequences.empty());
Assert.assertNotNull(report1);
Assert.assertEquals(3, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(1, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Query<?> interactive2 = scheduler.laneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of());
scheduler.run(interactive2, Sequences.empty());
Assert.assertNotNull(interactive2);
Assert.assertEquals(2, scheduler.getTotalAvailableCapacity());
Query<?> report2 = scheduler.laneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of());
scheduler.run(report2, Sequences.empty());
Assert.assertNotNull(report2);
Assert.assertEquals(1, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(0, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Query<?> interactive3 = scheduler.laneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of());
scheduler.run(interactive3, Sequences.empty());
Assert.assertNotNull(interactive3);
Assert.assertEquals(0, scheduler.getTotalAvailableCapacity());
// one too many
scheduler.run(scheduler.laneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of()), Sequences.empty());
}
@Test
public void testConcurrency() throws Exception
{
List<Future<?>> futures = new ArrayList<>(NUM_CONCURRENT_QUERIES);
for (int i = 0; i < NUM_CONCURRENT_QUERIES; i++) {
futures.add(makeQueryFuture(executorService, scheduler, makeRandomQuery(), NUM_ROWS));
maybeDelayNextIteration(i);
}
getFuturesAndAssertAftermathIsChill(futures, scheduler, false);
}
@Test
public void testConcurrencyLo() throws Exception
{
List<Future<?>> futures = new ArrayList<>(NUM_CONCURRENT_QUERIES);
for (int i = 0; i < NUM_CONCURRENT_QUERIES; i++) {
futures.add(makeQueryFuture(executorService, scheduler, makeReportQuery(), NUM_ROWS));
maybeDelayNextIteration(i);
}
getFuturesAndAssertAftermathIsChill(futures, scheduler, false);
}
@Test
public void testConcurrencyHi() throws Exception
{
List<Future<?>> futures = new ArrayList<>(NUM_CONCURRENT_QUERIES);
for (int i = 0; i < NUM_CONCURRENT_QUERIES; i++) {
futures.add(makeQueryFuture(executorService, scheduler, makeInteractiveQuery(), NUM_ROWS));
maybeDelayNextIteration(i);
}
getFuturesAndAssertAftermathIsChill(futures, scheduler, true);
}
@Test
public void testConfigNone()
{
final Injector injector = createInjector();
final String propertyPrefix = "druid.query.scheduler";
final JsonConfigProvider<QuerySchedulerProvider> provider = JsonConfigProvider.of(
propertyPrefix,
QuerySchedulerProvider.class
);
final Properties properties = new Properties();
properties.setProperty(propertyPrefix + ".numThreads", "10");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final QueryScheduler scheduler = provider.get().get().get();
Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(-1, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
}
@Test
public void testConfigHiLo()
{
final Injector injector = createInjector();
final String propertyPrefix = "druid.query.scheduler";
final JsonConfigProvider<QuerySchedulerProvider> provider = JsonConfigProvider.of(
propertyPrefix,
QuerySchedulerProvider.class
);
final Properties properties = new Properties();
properties.setProperty(propertyPrefix + ".numThreads", "10");
properties.setProperty(propertyPrefix + ".laning.strategy", "hilo");
properties.setProperty(propertyPrefix + ".laning.maxLowPercent", "20");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final QueryScheduler scheduler = provider.get().get().get();
Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
}
@Test
public void testMisConfigHiLo()
{
expected.expect(ProvisionException.class);
final Injector injector = createInjector();
final String propertyPrefix = "druid.query.scheduler";
final JsonConfigProvider<QuerySchedulerProvider> provider = JsonConfigProvider.of(
propertyPrefix,
QuerySchedulerProvider.class
);
final Properties properties = new Properties();
properties.setProperty(propertyPrefix + ".laning.strategy", "hilo");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final QueryScheduler scheduler = provider.get().get().get();
Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
}
private void maybeDelayNextIteration(int i) throws InterruptedException
{
if (i > 0 && i % 10 == 0) {
Thread.sleep(2);
}
}
private TopNQuery makeRandomQuery()
{
return ThreadLocalRandom.current().nextBoolean() ? makeInteractiveQuery() : makeReportQuery();
}
private TopNQuery makeInteractiveQuery()
{
return makeBaseBuilder()
.context(ImmutableMap.of("priority", 10, "queryId", "high-" + UUID.randomUUID()))
.build();
}
private TopNQuery makeReportQuery()
{
return makeBaseBuilder()
.context(ImmutableMap.of("priority", -1, "queryId", "low-" + UUID.randomUUID()))
.build();
}
private TopNQueryBuilder makeBaseBuilder()
{
return new TopNQueryBuilder()
.dataSource("foo")
.intervals("2020-01-01/2020-01-02")
.dimension("bar")
.metric("chocula")
.aggregators(new CountAggregatorFactory("chocula"))
.threshold(10);
}
private <T> int consumeAndCloseSequence(Sequence<T> sequence) throws IOException
{
Yielder<T> yielder = Yielders.each(sequence);
int rowCount = 0;
while (!yielder.isDone()) {
rowCount++;
yielder = yielder.next(yielder.get());
}
yielder.close();
return rowCount;
}
private Sequence<Integer> makeSequence(int count)
{
return new LazySequence<>(() -> {
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Integer, Iterator<Integer>>()
{
@Override
public Iterator<Integer> make()
{
return new Iterator<Integer>()
{
int rowCounter = 0;
@Override
public boolean hasNext()
{
return rowCounter < count;
}
@Override
public Integer next()
{
rowCounter++;
return rowCounter;
}
};
}
@Override
public void cleanup(Iterator<Integer> iterFromMake)
{
// nothing to cleanup
}
}
);
});
}
private Sequence<Integer> makeExplodingSequence(int explodeAfter)
{
final int explodeAt = explodeAfter + 1;
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Integer, Iterator<Integer>>()
{
@Override
public Iterator<Integer> make()
{
return new Iterator<Integer>()
{
int rowCounter = 0;
@Override
public boolean hasNext()
{
return rowCounter < explodeAt;
}
@Override
public Integer next()
{
if (rowCounter == explodeAfter) {
throw new RuntimeException("exploded");
}
rowCounter++;
return rowCounter;
}
};
}
@Override
public void cleanup(Iterator<Integer> iterFromMake)
{
// nothing to cleanup
}
}
);
}
private ListenableFuture<?> makeQueryFuture(
ListeningExecutorService executorService,
QueryScheduler scheduler,
Query<?> query,
int numRows
)
{
return executorService.submit(() -> {
try {
Query<?> scheduled = scheduler.laneQuery(QueryPlus.wrap(query), ImmutableSet.of());
Assert.assertNotNull(scheduled);
Sequence<Integer> underlyingSequence = makeSequence(numRows);
Sequence<Integer> results = scheduler.run(scheduled, underlyingSequence);
final int actualNumRows = consumeAndCloseSequence(results);
Assert.assertEquals(actualNumRows, numRows);
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
});
}
private void getFuturesAndAssertAftermathIsChill(
List<Future<?>> futures,
QueryScheduler scheduler,
boolean successEqualsTotal
)
{
int success = 0;
int denied = 0;
int other = 0;
for (Future<?> f : futures) {
try {
f.get();
success++;
}
catch (ExecutionException ex) {
if (ex.getCause() instanceof QueryCapacityExceededException) {
denied++;
} else {
other++;
}
}
catch (Exception ex) {
other++;
}
}
Assert.assertEquals(0, other);
if (successEqualsTotal) {
Assert.assertEquals(success, totalAcquired.get());
} else {
Assert.assertTrue(success > 0 && success <= totalAcquired.get());
}
Assert.assertTrue(denied > 0);
Assert.assertEquals(totalReleased.get(), totalAcquired.get());
Assert.assertEquals(laneReleased.get(), laneAcquired.get() + laneNotAcquired.get());
Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
}
private Injector createInjector()
{
Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.of(
binder -> {
binder.bind(ServerConfig.class).toInstance(new ServerConfig());
JsonConfigProvider.bind(binder, "druid.query.scheduler", QuerySchedulerProvider.class, Global.class);
}
)
);
ObjectMapper mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ServerConfig.class, injector.getInstance(ServerConfig.class))
);
return injector;
}
}

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.scheduling;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.server.QueryLaningStrategy;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class HiLoQueryLaningStrategyTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private Druids.TimeseriesQueryBuilder queryBuilder;
private HiLoQueryLaningStrategy strategy;
@Before
public void setup()
{
this.queryBuilder = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals(ImmutableList.of(Intervals.ETERNITY))
.granularity(Granularities.DAY)
.aggregators(new CountAggregatorFactory("count"));
this.strategy = new HiLoQueryLaningStrategy(40);
}
@Test
public void testMaxPercentageThreadsRequired()
{
expectedException.expect(NullPointerException.class);
expectedException.expectMessage("maxLowPercent must be set");
QueryLaningStrategy strategy = new HiLoQueryLaningStrategy(null);
}
@Test
public void testMaxLowPercentMustBeGreaterThanZero()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("maxLowPercent must be in the range 1 to 100");
QueryLaningStrategy strategy = new HiLoQueryLaningStrategy(-1);
}
@Test
public void testMaxLowPercentMustBeLessThanOrEqual100()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("maxLowPercent must be in the range 1 to 100");
QueryLaningStrategy strategy = new HiLoQueryLaningStrategy(9000);
}
@Test
public void testMaxLowPercentZero()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("maxLowPercent must be in the range 1 to 100");
QueryLaningStrategy strategy = new HiLoQueryLaningStrategy(0);
}
@Test
public void testMaxLowPercent100()
{
QueryLaningStrategy strategy = new HiLoQueryLaningStrategy(100);
Object2IntMap<String> laneConfig = strategy.getLaneLimits(25);
Assert.assertEquals(1, laneConfig.size());
Assert.assertTrue(laneConfig.containsKey(HiLoQueryLaningStrategy.LOW));
Assert.assertEquals(25, laneConfig.getInt(HiLoQueryLaningStrategy.LOW));
}
@Test
public void testMaxLowPercentRoundsUp()
{
// will round up to 1
QueryLaningStrategy strategyRoundLow = new HiLoQueryLaningStrategy(1);
Object2IntMap<String> laneConfigRoundLow = strategyRoundLow.getLaneLimits(25);
Assert.assertEquals(1, laneConfigRoundLow.size());
Assert.assertTrue(laneConfigRoundLow.containsKey(HiLoQueryLaningStrategy.LOW));
Assert.assertEquals(1, laneConfigRoundLow.getInt(HiLoQueryLaningStrategy.LOW));
// will not round, evenly divides
QueryLaningStrategy strategy = new HiLoQueryLaningStrategy(96);
Object2IntMap<String> laneConfig = strategy.getLaneLimits(25);
Assert.assertEquals(1, laneConfig.size());
Assert.assertTrue(laneConfig.containsKey(HiLoQueryLaningStrategy.LOW));
Assert.assertEquals(24, laneConfig.getInt(HiLoQueryLaningStrategy.LOW));
// will round up
QueryLaningStrategy strategyRounded = new HiLoQueryLaningStrategy(97);
Object2IntMap<String> laneConfigRounded = strategyRounded.getLaneLimits(25);
Assert.assertEquals(1, laneConfigRounded.size());
Assert.assertTrue(laneConfigRounded.containsKey(HiLoQueryLaningStrategy.LOW));
Assert.assertEquals(25, laneConfigRounded.getInt(HiLoQueryLaningStrategy.LOW));
}
@Test
public void testLaneLimits()
{
Object2IntMap<String> laneConfig = strategy.getLaneLimits(5);
Assert.assertEquals(1, laneConfig.size());
Assert.assertTrue(laneConfig.containsKey(HiLoQueryLaningStrategy.LOW));
Assert.assertEquals(2, laneConfig.getInt(HiLoQueryLaningStrategy.LOW));
}
@Test
public void testLaningNoPriority()
{
TimeseriesQuery query = queryBuilder.build();
Assert.assertFalse(strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent());
}
@Test
public void testLaningZeroPriority()
{
TimeseriesQuery query = queryBuilder.context(ImmutableMap.of(QueryContexts.PRIORITY_KEY, 0)).build();
Assert.assertFalse(strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent());
}
@Test
public void testLaningInteractivePriority()
{
TimeseriesQuery query = queryBuilder.context(ImmutableMap.of(QueryContexts.PRIORITY_KEY, 100)).build();
Assert.assertFalse(strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent());
}
@Test
public void testLaningLowPriority()
{
TimeseriesQuery query = queryBuilder.context(ImmutableMap.of(QueryContexts.PRIORITY_KEY, -1)).build();
Assert.assertTrue(strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent());
Assert.assertEquals(
HiLoQueryLaningStrategy.LOW,
strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get()
);
}
@Test
public void testLaningPreservesManualSetLane()
{
TimeseriesQuery query = queryBuilder.context(
ImmutableMap.of(QueryContexts.PRIORITY_KEY, 100, QueryContexts.LANE_KEY, "low")
).build();
Assert.assertEquals(
HiLoQueryLaningStrategy.LOW,
strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get()
);
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.scheduling;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class NoQueryLaningStrategyTest
{
private Druids.TimeseriesQueryBuilder queryBuilder;
private NoQueryLaningStrategy strategy;
@Before
public void setup()
{
this.queryBuilder = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals(ImmutableList.of(Intervals.ETERNITY))
.granularity(Granularities.DAY)
.aggregators(new CountAggregatorFactory("count"));
this.strategy = new NoQueryLaningStrategy();
}
@Test
public void testDoesntSetLane()
{
TimeseriesQuery query = queryBuilder.context(ImmutableMap.of()).build();
Assert.assertFalse(strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent());
}
@Test
public void testPreservesManualLaneFromContext()
{
final String someLane = "some-lane";
TimeseriesQuery query = queryBuilder.context(
ImmutableMap.of(QueryContexts.PRIORITY_KEY, 100, QueryContexts.LANE_KEY, someLane)
).build();
Assert.assertEquals(
someLane,
strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get()
);
}
}

View File

@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.server.QueryCapacityExceededException;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.SqlLifecycle;
import org.apache.druid.sql.SqlLifecycleFactory;
@ -171,6 +172,10 @@ public class SqlResource
throw new RuntimeException(e);
}
}
catch (QueryCapacityExceededException cap) {
lifecycle.emitLogsAndMetrics(cap, remoteAddr, -1);
return Response.status(QueryCapacityExceededException.STATUS_CODE).entity(jsonMapper.writeValueAsBytes(cap)).build();
}
catch (ForbiddenException e) {
throw e; // let ForbiddenExceptionMapper handle this
}

View File

@ -106,6 +106,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.log.NoopRequestLogger;
import org.apache.druid.server.security.Access;
@ -700,6 +701,15 @@ public class CalciteTests
final QueryRunnerFactoryConglomerate conglomerate,
final File tmpDir
)
{
return createMockWalker(conglomerate, tmpDir, null);
}
public static SpecificSegmentsQuerySegmentWalker createMockWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final File tmpDir,
@Nullable final QueryScheduler scheduler
)
{
final QueryableIndex index1 = IndexBuilder
.create()
@ -753,7 +763,8 @@ public class CalciteTests
return new SpecificSegmentsQuerySegmentWalker(
conglomerate,
INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class),
null
null,
scheduler
).add(
DataSegment.builder()
.dataSource(DATASOURCE1)

View File

@ -25,10 +25,12 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.io.Closeables;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.InlineDataSource;
@ -63,6 +65,7 @@ import org.apache.druid.segment.join.Joinables;
import org.apache.druid.segment.join.LookupJoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactoryTest;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
@ -78,6 +81,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -98,18 +102,21 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
private final QueryRunnerFactoryConglomerate conglomerate;
private final QuerySegmentWalker walker;
private final JoinableFactory joinableFactory;
private final QueryScheduler scheduler;
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines = new HashMap<>();
private final List<Closeable> closeables = new ArrayList<>();
private final List<DataSegment> segments = new ArrayList<>();
/**
* Create an instance using the provided query runner factory conglomerate and lookup provider.
* If a JoinableFactory is provided, it will be used instead of the default.
* If a JoinableFactory is provided, it will be used instead of the default. If a scheduler is included,
* the runner will schedule queries according to the scheduling config.
*/
public SpecificSegmentsQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final LookupExtractorFactoryContainerProvider lookupProvider,
@Nullable final JoinableFactory joinableFactory
@Nullable final JoinableFactory joinableFactory,
@Nullable final QueryScheduler scheduler
)
{
this.conglomerate = conglomerate;
@ -121,6 +128,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
.build()
) : joinableFactory;
this.scheduler = scheduler;
this.walker = new ClientQuerySegmentWalker(
new NoopServiceEmitter(),
new DataServerLikeWalker(),
@ -165,6 +173,20 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
);
}
/**
* Create an instance using the provided query runner factory conglomerate and lookup provider.
* If a JoinableFactory is provided, it will be used instead of the default.
*/
public SpecificSegmentsQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final LookupExtractorFactoryContainerProvider lookupProvider,
@Nullable final JoinableFactory joinableFactory
)
{
this(conglomerate, lookupProvider, joinableFactory, null);
}
/**
* Create an instance without any lookups, using the default JoinableFactory
*/
@ -389,13 +411,33 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
toolChest
);
// Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments.
// This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan)
// to function properly.
return (theQuery, responseContext) -> baseRunner.run(
theQuery.withQuery(Queries.withSpecificSegments(theQuery.getQuery(), ImmutableList.copyOf(specs))),
responseContext
);
return (theQuery, responseContext) -> {
if (scheduler != null) {
Set<SegmentServerSelector> segments = new HashSet<>();
specs.forEach(spec -> segments.add(new SegmentServerSelector(null, spec)));
return scheduler.run(
scheduler.laneQuery(theQuery, segments),
new LazySequence<>(
() -> baseRunner.run(
theQuery.withQuery(Queries.withSpecificSegments(
theQuery.getQuery(),
ImmutableList.copyOf(specs)
)),
responseContext
)
)
);
} else {
return baseRunner.run(
theQuery.withQuery(Queries.withSpecificSegments(theQuery.getQuery(), ImmutableList.copyOf(specs))),
responseContext
);
}
};
}
private <T> QueryRunner<T> makeTableRunner(

View File

@ -25,6 +25,8 @@ import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.calcite.avatica.SqlType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.ValidationException;
@ -33,13 +35,19 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryException;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.server.QueryCapacityExceededException;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.log.TestRequestLogger;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.SqlLifecycleFactory;
@ -67,9 +75,11 @@ import javax.ws.rs.core.StreamingOutput;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -85,9 +95,11 @@ public class SqlResourceTest extends CalciteTestBase
@Rule
public QueryLogHook queryLogHook = QueryLogHook.create();
private SpecificSegmentsQuerySegmentWalker walker = null;
private QueryScheduler scheduler = null;
private TestRequestLogger testRequestLogger;
private SqlResource resource;
private HttpServletRequest req;
private ListeningExecutorService executorService;
@BeforeClass
public static void setUpClass()
@ -107,7 +119,11 @@ public class SqlResourceTest extends CalciteTestBase
@Before
public void setUp() throws Exception
{
walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
executorService = MoreExecutors.listeningDecorator(
Execs.multiThreaded(8, "test_sql_resource_%s")
);
scheduler = new QueryScheduler(5, new HiLoQueryLaningStrategy(40), new ServerConfig());
walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder(), scheduler);
final PlannerConfig plannerConfig = new PlannerConfig()
{
@ -167,6 +183,7 @@ public class SqlResourceTest extends CalciteTestBase
{
walker.close();
walker = null;
executorService.shutdownNow();
}
@Test
@ -638,7 +655,7 @@ public class SqlResourceTest extends CalciteTestBase
@Test
public void testCannotValidate() throws Exception
{
final QueryInterruptedException exception = doPost(
final QueryException exception = doPost(
new SqlQuery(
"SELECT dim4 FROM druid.foo",
ResultFormat.OBJECT,
@ -659,7 +676,7 @@ public class SqlResourceTest extends CalciteTestBase
public void testCannotConvert() throws Exception
{
// SELECT + ORDER unsupported
final QueryInterruptedException exception = doPost(
final QueryException exception = doPost(
new SqlQuery("SELECT dim1 FROM druid.foo ORDER BY dim1", ResultFormat.OBJECT, false, null, null)
).lhs;
@ -676,7 +693,7 @@ public class SqlResourceTest extends CalciteTestBase
@Test
public void testResourceLimitExceeded() throws Exception
{
final QueryInterruptedException exception = doPost(
final QueryException exception = doPost(
new SqlQuery(
"SELECT DISTINCT dim1 FROM foo",
ResultFormat.OBJECT,
@ -692,6 +709,56 @@ public class SqlResourceTest extends CalciteTestBase
checkSqlRequestLog(false);
}
@Test
public void testTooManyRequests() throws Exception
{
final int numQueries = 3;
List<Future<Pair<QueryException, List<Map<String, Object>>>>> futures = new ArrayList<>(numQueries);
for (int i = 0; i < numQueries; i++) {
futures.add(executorService.submit(() -> {
try {
return doPost(
new SqlQuery(
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
null,
false,
ImmutableMap.of("priority", -5),
null
),
makeExpectedReq()
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}));
}
int success = 0;
int limited = 0;
for (int i = 0; i < numQueries; i++) {
Pair<QueryException, List<Map<String, Object>>> result = futures.get(i).get();
List<Map<String, Object>> rows = result.rhs;
if (rows != null) {
Assert.assertEquals(ImmutableList.of(ImmutableMap.of("cnt", 6, "TheFoo", "foo")), rows);
success++;
} else {
QueryException interruped = result.lhs;
Assert.assertEquals(QueryCapacityExceededException.ERROR_CODE, interruped.getErrorCode());
Assert.assertEquals(
StringUtils.format(QueryCapacityExceededException.ERROR_MESSAGE_TEMPLATE, HiLoQueryLaningStrategy.LOW),
interruped.getMessage()
);
limited++;
}
}
Assert.assertEquals(2, success);
Assert.assertEquals(1, limited);
Assert.assertEquals(3, testRequestLogger.getSqlQueryLogs().size());
}
@SuppressWarnings("unchecked")
private void checkSqlRequestLog(boolean success)
{
@ -710,23 +777,53 @@ public class SqlResourceTest extends CalciteTestBase
}
}
private Pair<QueryException, List<Map<String, Object>>> doPost(final SqlQuery query) throws Exception
{
return doPost(query, new TypeReference<List<Map<String, Object>>>()
{
});
}
// Returns either an error or a result, assuming the result is a JSON object.
private <T> Pair<QueryInterruptedException, T> doPost(
private <T> Pair<QueryException, T> doPost(
final SqlQuery query,
final TypeReference<T> typeReference
) throws Exception
{
final Pair<QueryInterruptedException, String> pair = doPostRaw(query);
return doPost(query, req, typeReference);
}
private Pair<QueryException, String> doPostRaw(final SqlQuery query) throws Exception
{
return doPostRaw(query, req);
}
private Pair<QueryException, List<Map<String, Object>>> doPost(final SqlQuery query, HttpServletRequest req) throws Exception
{
return doPost(query, req, new TypeReference<List<Map<String, Object>>>()
{
});
}
// Returns either an error or a result, assuming the result is a JSON object.
private <T> Pair<QueryException, T> doPost(
final SqlQuery query,
final HttpServletRequest req,
final TypeReference<T> typeReference
) throws Exception
{
final Pair<QueryException, String> pair = doPostRaw(query, req);
if (pair.rhs == null) {
//noinspection unchecked
return (Pair<QueryInterruptedException, T>) pair;
return (Pair<QueryException, T>) pair;
} else {
return Pair.of(pair.lhs, JSON_MAPPER.readValue(pair.rhs, typeReference));
}
}
// Returns either an error or a result.
private Pair<QueryInterruptedException, String> doPostRaw(final SqlQuery query) throws Exception
private Pair<QueryException, String> doPostRaw(final SqlQuery query, final HttpServletRequest req) throws Exception
{
final Response response = resource.doPost(query, req);
if (response.getStatus() == 200) {
@ -739,16 +836,32 @@ public class SqlResourceTest extends CalciteTestBase
);
} else {
return Pair.of(
JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryInterruptedException.class),
JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryException.class),
null
);
}
}
private Pair<QueryInterruptedException, List<Map<String, Object>>> doPost(final SqlQuery query) throws Exception
private HttpServletRequest makeExpectedReq()
{
return doPost(query, new TypeReference<List<Map<String, Object>>>()
{
});
HttpServletRequest req = EasyMock.createStrictMock(HttpServletRequest.class);
EasyMock.expect(req.getRemoteAddr()).andReturn(null).once();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
.andReturn(CalciteTests.REGULAR_USER_AUTH_RESULT)
.anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED))
.andReturn(null)
.anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
.andReturn(CalciteTests.REGULAR_USER_AUTH_RESULT)
.anyTimes();
req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
.andReturn(CalciteTests.REGULAR_USER_AUTH_RESULT)
.anyTimes();
EasyMock.replay(req);
return req;
}
}

View File

@ -260,6 +260,7 @@ javadoc
kerberos
keystore
keytab
laning
lifecycle
localhost
log4j