Make timeout behavior consistent to document (#4134)

* Make timeout behavior consistent to document

* Refactoring BlockingPool and add more methods to QueryContexts

* remove unused imports

* Addressed comments

* Address comments

* remove unused method

* Make default query timeout configurable

* Fix test failure

* Change timeout from period to millis
This commit is contained in:
Jihoon Son 2017-04-19 09:47:53 +09:00 committed by Gian Merlino
parent db656c5a88
commit 5b69f2eff2
37 changed files with 521 additions and 255 deletions

View File

@ -74,40 +74,57 @@ public class BlockingPool<T>
}
/**
* Take a resource from the pool.
* Take a resource from the pool, waiting up to the
* specified wait time if necessary for an element to become available.
*
* @param timeout maximum time to wait for a resource, in milliseconds. Negative means do not use a timeout.
* @param timeoutMs maximum time to wait for a resource, in milliseconds.
*
* @return a resource, or null if the timeout was reached
*/
public ReferenceCountingResourceHolder<T> take(final long timeout)
public ReferenceCountingResourceHolder<T> take(final long timeoutMs)
{
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
final T theObject;
try {
if (timeout > -1) {
theObject = timeout > 0 ? poll(timeout) : poll();
} else {
theObject = take();
}
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
{
@Override
public void close() throws IOException
{
offer(theObject);
}
}
);
return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
private T poll()
/**
* Take a resource from the pool, waiting if necessary until an element becomes available.
*
* @return a resource
*/
public ReferenceCountingResourceHolder<T> take()
{
checkInitialized();
try {
return wrapObject(takeObject());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
private ReferenceCountingResourceHolder<T> wrapObject(T theObject)
{
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
{
@Override
public void close() throws IOException
{
offer(theObject);
}
}
);
}
private T pollObject()
{
final ReentrantLock lock = this.lock;
lock.lock();
@ -118,9 +135,9 @@ public class BlockingPool<T>
}
}
private T poll(long timeout) throws InterruptedException
private T pollObject(long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeout);
long nanos = TIME_UNIT.toNanos(timeoutMs);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
@ -136,7 +153,7 @@ public class BlockingPool<T>
}
}
private T take() throws InterruptedException
private T takeObject() throws InterruptedException
{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
@ -151,41 +168,60 @@ public class BlockingPool<T>
}
/**
* Take a resource from the pool.
* Take resources from the pool, waiting up to the
* specified wait time if necessary for elements of the given number to become available.
*
* @param elementNum number of resources to take
* @param timeout maximum time to wait for resources, in milliseconds. Negative means do not use a timeout.
* @param timeoutMs maximum time to wait for resources, in milliseconds.
*
* @return a resource, or null if the timeout was reached
*/
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum, final long timeout)
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum, final long timeoutMs)
{
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
final List<T> objects;
try {
if (timeout > -1) {
objects = timeout > 0 ? pollBatch(elementNum, timeout) : pollBatch(elementNum);
} else {
objects = takeBatch(elementNum);
}
return objects == null ? null : new ReferenceCountingResourceHolder<>(
objects,
new Closeable()
{
@Override
public void close() throws IOException
{
offerBatch(objects);
}
}
);
return wrapObjects(timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum));
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
private List<T> pollBatch(int elementNum) throws InterruptedException
/**
* Take resources from the pool, waiting if necessary until the elements of the given number become available.
*
* @param elementNum number of resources to take
*
* @return a resource
*/
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum)
{
checkInitialized();
try {
return wrapObjects(takeObjects(elementNum));
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
private ReferenceCountingResourceHolder<List<T>> wrapObjects(List<T> theObjects)
{
return theObjects == null ? null : new ReferenceCountingResourceHolder<>(
theObjects,
new Closeable()
{
@Override
public void close() throws IOException
{
offerBatch(theObjects);
}
}
);
}
private List<T> pollObjects(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
@ -204,9 +240,9 @@ public class BlockingPool<T>
}
}
private List<T> pollBatch(int elementNum, long timeout) throws InterruptedException
private List<T> pollObjects(int elementNum, long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeout);
long nanos = TIME_UNIT.toNanos(timeoutMs);
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
@ -226,7 +262,7 @@ public class BlockingPool<T>
}
}
private List<T> takeBatch(int elementNum) throws InterruptedException
private List<T> takeObjects(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;

View File

@ -36,6 +36,7 @@ Druid uses Jetty to serve HTTP requests.
|--------|-----------|-------|
|`druid.server.http.numThreads`|Number of threads for HTTP requests.|10|
|`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20|
|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip|
|`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M|

View File

@ -48,6 +48,7 @@ Druid uses Jetty to serve HTTP requests.
|--------|-----------|-------|
|`druid.server.http.numThreads`|Number of threads for HTTP requests.|10|
|`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
#### Processing

View File

@ -7,16 +7,16 @@ Query Context
The query context is used for various query configuration parameters. The following parameters apply to all queries.
|property |default | description |
|-----------------|---------------------|----------------------|
|timeout | `0` (no timeout) | Query timeout in milliseconds, beyond which unfinished queries will be cancelled. |
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration |
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration |
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. |
|property |default | description |
|-----------------|----------------------------------------|----------------------|
|timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](broker.html) |
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration |
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration |
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. |
In addition, some query types offer context parameters specific to that query type.

View File

@ -28,6 +28,7 @@ import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.QueryContexts;
import io.druid.query.QueryInterruptedException;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
@ -65,6 +66,7 @@ public class ScanQueryEngine
return Sequences.empty();
}
}
final boolean hasTimeout = QueryContexts.hasTimeout(query);
final Long timeoutAt = (long) responseContext.get(ScanQueryRunnerFactory.CTX_TIMEOUT_AT);
final long start = System.currentTimeMillis();
final StorageAdapter adapter = segment.asStorageAdapter();
@ -156,7 +158,7 @@ public class ScanQueryEngine
@Override
public ScanResultValue next()
{
if (System.currentTimeMillis() >= timeoutAt) {
if (hasTimeout && System.currentTimeMillis() >= timeoutAt) {
throw new QueryInterruptedException(new TimeoutException());
}
long lastOffset = offset;
@ -173,10 +175,12 @@ public class ScanQueryEngine
ScanQueryRunnerFactory.CTX_COUNT,
(long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset)
);
responseContext.put(
ScanQueryRunnerFactory.CTX_TIMEOUT_AT,
timeoutAt - (System.currentTimeMillis() - start)
);
if (hasTimeout) {
responseContext.put(
ScanQueryRunnerFactory.CTX_TIMEOUT_AT,
timeoutAt - (System.currentTimeMillis() - start)
);
}
return new ScanResultValue(segmentId, allColumns, events);
}

View File

@ -25,7 +25,7 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryContexts;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
@ -36,6 +36,8 @@ import java.util.concurrent.ExecutorService;
public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValue, ScanQuery>
{
// This variable indicates when a running query should be expired,
// and is effective only when 'timeout' of queryContext has a positive value.
public static final String CTX_TIMEOUT_AT = "timeoutAt";
public static final String CTX_COUNT = "count";
private final ScanQueryQueryToolChest toolChest;
@ -71,9 +73,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
final Query<ScanResultValue> query, final Map<String, Object> responseContext
)
{
final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null);
final long timeoutAt = (queryTimeout == null || queryTimeout.longValue() == 0L)
? JodaUtils.MAX_INSTANT : System.currentTimeMillis() + queryTimeout.longValue();
// Note: this variable is effective only when queryContext has a timeout.
// See the comment of CTX_TIMEOUT_AT.
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(query);
responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
return Sequences.concat(
Sequences.map(
@ -122,7 +124,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
final Number timeoutAt = (Number) responseContext.get(CTX_TIMEOUT_AT);
if (timeoutAt == null || timeoutAt.longValue() == 0L) {
responseContext.put(CTX_TIMEOUT_AT, JodaUtils.MAX_INSTANT);
};
}
return engine.process((ScanQuery) query, segment, responseContext);
}
}

View File

@ -49,7 +49,7 @@ public class AsyncQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final int priority = BaseQuery.getContextPriority(query, 0);
final int priority = QueryContexts.getPriority(query);
final ListenableFuture<Sequence<T>> future = executor.submit(new AbstractPrioritizedCallable<Sequence<T>>(priority)
{
@Override
@ -68,11 +68,10 @@ public class AsyncQueryRunner<T> implements QueryRunner<T>
public Sequence<T> get()
{
try {
Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT);
if (timeout == null) {
return future.get();
if (QueryContexts.hasTimeout(query)) {
return future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS);
} else {
return future.get(timeout.longValue(), TimeUnit.MILLISECONDS);
return future.get();
}
} catch (ExecutionException | InterruptedException | TimeoutException ex) {
throw Throwables.propagate(ex);

View File

@ -24,7 +24,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.spec.QuerySegmentSpec;
import org.joda.time.Duration;
@ -37,66 +36,6 @@ import java.util.Map;
*/
public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
{
public static <T> int getContextPriority(Query<T> query, int defaultValue)
{
return parseInt(query, "priority", defaultValue);
}
public static <T> boolean getContextBySegment(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "bySegment", defaultValue);
}
public static <T> boolean getContextPopulateCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "populateCache", defaultValue);
}
public static <T> boolean getContextUseCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "useCache", defaultValue);
}
public static <T> boolean getContextFinalize(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "finalize", defaultValue);
}
public static <T> int getContextUncoveredIntervalsLimit(Query<T> query, int defaultValue)
{
return parseInt(query, "uncoveredIntervalsLimit", defaultValue);
}
private static <T> int parseInt(Query<T> query, String key, int defaultValue)
{
Object val = query.getContextValue(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Integer.parseInt((String) val);
} else if (val instanceof Integer) {
return (int) val;
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
private static <T> boolean parseBoolean(Query<T> query, String key, boolean defaultValue)
{
Object val = query.getContextValue(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Boolean.parseBoolean((String) val);
} else if (val instanceof Boolean) {
return (boolean) val;
} else {
throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass());
}
}
public static void checkInterrupted()
{
if (Thread.interrupted()) {
@ -203,7 +142,7 @@ public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
@Override
public boolean getContextBoolean(String key, boolean defaultValue)
{
return parseBoolean(this, key, defaultValue);
return QueryContexts.parseBoolean(this, key, defaultValue);
}
protected Map<String, Object> computeOverridenContext(Map<String, Object> overrides)
@ -237,6 +176,12 @@ public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
return withOverriddenContext(ImmutableMap.<String, Object>of(QUERYID, id));
}
@Override
public Query<T> withDefaultTimeout(long defaultTimeout)
{
return withOverriddenContext(ImmutableMap.of(QueryContexts.DEFAULT_TIMEOUT_KEY, defaultTimeout));
}
@Override
public boolean equals(Object o)
{

View File

@ -51,7 +51,7 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
@SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{
if (BaseQuery.getContextBySegment(query, false)) {
if (QueryContexts.isBySegment(query)) {
final Sequence<T> baseSequence = base.run(query, responseContext);
final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
return Sequences.simple(

View File

@ -40,7 +40,7 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
if (BaseQuery.getContextBySegment(query, false)) {
if (QueryContexts.isBySegment(query)) {
return baseRunner.run(query, responseContext);
}

View File

@ -91,7 +91,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final int priority = BaseQuery.getContextPriority(query, 0);
final int priority = QueryContexts.getPriority(query);
final Ordering ordering = query.getResultOrdering();
return new BaseSequence<T, Iterator<T>>(
@ -152,12 +152,11 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
queryWatcher.registerQuery(query, futures);
try {
final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, (Number) null);
return new MergeIterable<>(
ordering.nullsFirst(),
timeout == null ?
futures.get() :
futures.get(timeout.longValue(), TimeUnit.MILLISECONDS)
QueryContexts.hasTimeout(query) ?
futures.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS) :
futures.get()
).iterator();
}
catch (InterruptedException e) {

View File

@ -49,8 +49,8 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{
final boolean isBySegment = BaseQuery.getContextBySegment(query, false);
final boolean shouldFinalize = BaseQuery.getContextFinalize(query, true);
final boolean isBySegment = QueryContexts.isBySegment(query);
final boolean shouldFinalize = QueryContexts.isFinalize(query, true);
final Query<T> queryToRun;
final Function<T, T> finalizerFn;

View File

@ -90,8 +90,8 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
true
);
final Pair<Queue, Accumulator<Queue, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final boolean bySegment = BaseQuery.getContextBySegment(query, false);
final int priority = BaseQuery.getContextPriority(query, 0);
final boolean bySegment = QueryContexts.isBySegment(query);
final int priority = QueryContexts.getPriority(query);
ListenableFuture<List<Void>> futures = Futures.allAsList(
Lists.newArrayList(
@ -178,11 +178,10 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
{
try {
queryWatcher.registerQuery(query, future);
final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, (Number) null);
if (timeout == null) {
future.get();
if (QueryContexts.hasTimeout(query)) {
future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS);
} else {
future.get(timeout.longValue(), TimeUnit.MILLISECONDS);
future.get();
}
}
catch (InterruptedException e) {

View File

@ -148,7 +148,7 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
private Period getChunkPeriod(Query<T> query)
{
String p = query.getContextValue(QueryContextKeys.CHUNK_PERIOD, "P0D");
final String p = QueryContexts.getChunkPeriod(query);
return Period.parse(p);
}
}

View File

@ -99,4 +99,6 @@ public interface Query<T>
String getId();
Query<T> withDataSource(DataSource dataSource);
Query<T> withDefaultTimeout(long defaultTimeout);
}

View File

@ -1,27 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
public class QueryContextKeys
{
public static final String PRIORITY = "priority";
public static final String TIMEOUT = "timeout";
public static final String CHUNK_PERIOD = "chunkPeriod";
}

View File

@ -0,0 +1,168 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import com.google.common.base.Preconditions;
import io.druid.java.util.common.ISE;
public class QueryContexts
{
public static final String PRIORITY_KEY = "priority";
public static final String TIMEOUT_KEY = "timeout";
public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout";
public static final String CHUNK_PERIOD_KEY = "chunkPeriod";
public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
public static final boolean DEFAULT_USE_CACHE = true;
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes
public static final long NO_TIMEOUT = 0;
public static <T> boolean isBySegment(Query<T> query)
{
return isBySegment(query, DEFAULT_BY_SEGMENT);
}
public static <T> boolean isBySegment(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "bySegment", defaultValue);
}
public static <T> boolean isPopulateCache(Query<T> query)
{
return isPopulateCache(query, DEFAULT_POPULATE_CACHE);
}
public static <T> boolean isPopulateCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "populateCache", defaultValue);
}
public static <T> boolean isUseCache(Query<T> query)
{
return isUseCache(query, DEFAULT_USE_CACHE);
}
public static <T> boolean isUseCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "useCache", defaultValue);
}
public static <T> boolean isFinalize(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "finalize", defaultValue);
}
public static <T> int getUncoveredIntervalsLimit(Query<T> query)
{
return getUncoveredIntervalsLimit(query, DEFAULT_UNCOVERED_INTERVALS_LIMIT);
}
public static <T> int getUncoveredIntervalsLimit(Query<T> query, int defaultValue)
{
return parseInt(query, "uncoveredIntervalsLimit", defaultValue);
}
public static <T> int getPriority(Query<T> query)
{
return getPriority(query, DEFAULT_PRIORITY);
}
public static <T> int getPriority(Query<T> query, int defaultValue)
{
return parseInt(query, PRIORITY_KEY, defaultValue);
}
public static <T> String getChunkPeriod(Query<T> query)
{
return query.getContextValue(CHUNK_PERIOD_KEY, "P0D");
}
public static <T> boolean hasTimeout(Query<T> query)
{
return getTimeout(query) != NO_TIMEOUT;
}
public static <T> long getTimeout(Query<T> query)
{
return getTimeout(query, getDefaultTimeout(query));
}
public static <T> long getTimeout(Query<T> query, long defaultTimeout)
{
final long timeout = parseLong(query, TIMEOUT_KEY, defaultTimeout);
Preconditions.checkState(timeout >= 0, "Timeout must be a non negative value, but was [%d]", timeout);
return timeout;
}
static <T> long getDefaultTimeout(Query<T> query)
{
final long defaultTimeout = parseLong(query, DEFAULT_TIMEOUT_KEY, DEFAULT_TIMEOUT_MILLIS);
Preconditions.checkState(defaultTimeout >= 0, "Timeout must be a non negative value, but was [%d]", defaultTimeout);
return defaultTimeout;
}
static <T> long parseLong(Query<T> query, String key, long defaultValue)
{
Object val = query.getContextValue(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Long.parseLong((String) val);
} else if (val instanceof Number) {
return ((Number) val).longValue();
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
static <T> int parseInt(Query<T> query, String key, int defaultValue)
{
Object val = query.getContextValue(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Integer.parseInt((String) val);
} else if (val instanceof Number) {
return ((Number) val).intValue();
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
static <T> boolean parseBoolean(Query<T> query, String key, boolean defaultValue)
{
Object val = query.getContextValue(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Boolean.parseBoolean((String) val);
} else if (val instanceof Boolean) {
return (boolean) val;
} else {
throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass());
}
}
}

View File

@ -38,11 +38,11 @@ import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.MappedSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSource;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
@ -115,7 +115,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public Sequence<Row> run(Query<Row> query, Map<String, Object> responseContext)
{
if (BaseQuery.getContextBySegment(query, false)) {
if (QueryContexts.isBySegment(query)) {
return runner.run(query, responseContext);
}
@ -204,7 +204,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
);
final Sequence<Row> finalizingResults;
if (GroupByQuery.getContextFinalize(subquery, false)) {
if (QueryContexts.isFinalize(subquery, false)) {
finalizingResults = new MappedSequence<>(
subqueryResult,
makePreComputeManipulatorFn(

View File

@ -35,7 +35,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import io.druid.collections.BlockingPool;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.collections.Releaser;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.Row;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
@ -45,10 +44,9 @@ import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.BaseQuery;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryContexts;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
import io.druid.query.QueryWatcher;
@ -125,7 +123,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
ImmutableMap.<String, Object>of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true)
);
if (BaseQuery.getContextBySegment(query, false) || forceChainedExecution) {
if (QueryContexts.isBySegment(query) || forceChainedExecution) {
return new ChainedExecutionQueryRunner(exec, queryWatcher, queryables).run(query, responseContext);
}
@ -141,14 +139,13 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId())
);
final int priority = BaseQuery.getContextPriority(query, 0);
final int priority = QueryContexts.getPriority(query);
// Figure out timeoutAt time now, so we can apply the timeout to both the mergeBufferPool.take and the actual
// query processing together.
final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null);
final long timeoutAt = queryTimeout == null
? JodaUtils.MAX_INSTANT
: System.currentTimeMillis() + queryTimeout.longValue();
final long queryTimeout = QueryContexts.getTimeout(query);
final boolean hasTimeout = QueryContexts.hasTimeout(query);
final long timeoutAt = System.currentTimeMillis() + queryTimeout;
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Row, CloseableGrouperIterator<RowBasedKey, Row>>()
@ -170,9 +167,13 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
try {
// This will potentially block if there are no merge buffers left in the pool.
final long timeout = timeoutAt - System.currentTimeMillis();
if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
throw new TimeoutException();
if (hasTimeout) {
final long timeout = timeoutAt - System.currentTimeMillis();
if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
throw new TimeoutException();
}
} else {
mergeBufferHolder = mergeBufferPool.take();
}
resources.add(mergeBufferHolder);
}
@ -248,6 +249,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
waitForFutureCompletion(
query,
Futures.allAsList(ImmutableList.of(future)),
hasTimeout,
timeoutAt - System.currentTimeMillis()
);
}
@ -260,7 +262,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
);
if (!isSingleThreaded) {
waitForFutureCompletion(query, futures, timeoutAt - System.currentTimeMillis());
waitForFutureCompletion(query, futures, hasTimeout, timeoutAt - System.currentTimeMillis());
}
return RowBasedGrouperHelper.makeGrouperIterator(
@ -299,6 +301,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
private void waitForFutureCompletion(
GroupByQuery query,
ListenableFuture<List<AggregateResult>> future,
boolean hasTimeout,
long timeout
)
{
@ -307,11 +310,11 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
queryWatcher.registerQuery(query, future);
}
if (timeout <= 0) {
if (hasTimeout && timeout <= 0) {
throw new TimeoutException();
}
final List<AggregateResult> results = future.get(timeout, TimeUnit.MILLISECONDS);
final List<AggregateResult> results = hasTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get();
for (AggregateResult result : results) {
if (!result.isOk()) {

View File

@ -31,7 +31,6 @@ import com.google.inject.Inject;
import io.druid.collections.BlockingPool;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.guice.annotations.Global;
@ -47,7 +46,7 @@ import io.druid.query.DruidProcessingConfig;
import io.druid.query.InsufficientResourcesException;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryContexts;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryWatcher;
@ -141,10 +140,12 @@ public class GroupByStrategyV2 implements GroupByStrategy
} else if (requiredMergeBufferNum == 0) {
return new GroupByQueryResource();
} else {
final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, JodaUtils.MAX_INSTANT);
final ResourceHolder<List<ByteBuffer>> mergeBufferHolders = mergeBufferPool.takeBatch(
requiredMergeBufferNum, timeout.longValue()
);
final ResourceHolder<List<ByteBuffer>> mergeBufferHolders;
if (QueryContexts.hasTimeout(query)) {
mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum, QueryContexts.getTimeout(query));
} else {
mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum);
}
if (mergeBufferHolders == null) {
throw new InsufficientResourcesException("Cannot acquire enough merge buffers");
} else {

View File

@ -32,10 +32,9 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.BaseQuery;
import io.druid.query.ConcatQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryContexts;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
@ -202,7 +201,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
final Map<String, Object> responseContext
)
{
final int priority = BaseQuery.getContextPriority(query, 0);
final int priority = QueryContexts.getPriority(query);
ListenableFuture<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
new AbstractPrioritizedCallable<Sequence<SegmentAnalysis>>(priority)
{
@ -210,15 +209,18 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
public Sequence<SegmentAnalysis> call() throws Exception
{
return Sequences.simple(
Sequences.toList(input.run(query, responseContext), new ArrayList<SegmentAnalysis>())
Sequences.toList(input.run(query, responseContext), new ArrayList<>())
);
}
}
);
try {
queryWatcher.registerQuery(query, future);
final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, (Number) null);
return timeout == null ? future.get() : future.get(timeout.longValue(), TimeUnit.MILLISECONDS);
if (QueryContexts.hasTimeout(query)) {
return future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS);
} else {
return future.get();
}
}
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());

View File

@ -35,12 +35,12 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
@ -391,7 +391,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
return runner.run(query, responseContext);
}
final boolean isBySegment = BaseQuery.getContextBySegment(query, false);
final boolean isBySegment = QueryContexts.isBySegment(query);
return Sequences.map(
runner.run(query.withLimit(config.getMaxSearchLimit()), responseContext),

View File

@ -32,11 +32,11 @@ import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.query.BaseQuery;
import io.druid.query.BySegmentResultValue;
import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -535,7 +535,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return runner.run(query, responseContext);
}
final boolean isBySegment = BaseQuery.getContextBySegment(query, false);
final boolean isBySegment = QueryContexts.isBySegment(query);
return Sequences.map(
runner.run(query.withThreshold(minTopNThreshold), responseContext),

View File

@ -100,7 +100,7 @@ public class AsyncQueryRunnerTest
QueryRunnerTestHelper.NOOP_QUERYWATCHER);
Sequence lazy = asyncRunner.run(
query.withOverriddenContext(ImmutableMap.<String,Object>of(QueryContextKeys.TIMEOUT, 1)),
query.withOverriddenContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1)),
Collections.EMPTY_MAP);
try {

View File

@ -251,7 +251,7 @@ public class ChainedExecutionQueryRunnerTest
.dataSource("test")
.intervals("2014/2015")
.aggregators(Lists.<AggregatorFactory>newArrayList(new CountAggregatorFactory("count")))
.context(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, 100, "queryId", "test"))
.context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 100, "queryId", "test"))
.build(),
context
);

View File

@ -0,0 +1,127 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.query.filter.DimFilter;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class QueryContextsTest
{
private static class TestQuery extends BaseQuery
{
public TestQuery(DataSource dataSource, QuerySegmentSpec querySegmentSpec, boolean descending, Map context)
{
super(dataSource, querySegmentSpec, descending, context);
}
@Override
public boolean hasFilters()
{
return false;
}
@Override
public DimFilter getFilter()
{
return null;
}
@Override
public String getType()
{
return null;
}
@Override
public Query withQuerySegmentSpec(QuerySegmentSpec spec)
{
return null;
}
@Override
public Query withDataSource(DataSource dataSource)
{
return null;
}
@Override
public Query withOverriddenContext(Map contextOverride)
{
return new TestQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
computeOverridenContext(contextOverride)
);
}
}
@Test
public void testDefaultQueryTimeout()
{
final Query<?> query = new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))),
false,
new HashMap()
);
Assert.assertEquals(300_000, QueryContexts.getDefaultTimeout(query));
}
@Test
public void testEmptyQueryTimeout()
{
Query<?> query = new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))),
false,
new HashMap()
);
Assert.assertEquals(300_000, QueryContexts.getTimeout(query));
query = query.withDefaultTimeout(60_000);
Assert.assertEquals(60_000, QueryContexts.getTimeout(query));
}
@Test
public void testQueryTimeout()
{
Query<?> query = new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))),
false,
ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1000)
);
Assert.assertEquals(1000, QueryContexts.getTimeout(query));
query = query.withDefaultTimeout(1_000_000);
Assert.assertEquals(1000, QueryContexts.getTimeout(query));
}
}

View File

@ -33,6 +33,7 @@ import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.Druids;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
@ -101,7 +102,7 @@ public class DataSourceMetadataQueryTest
), Query.class
);
Assert.assertEquals(1, serdeQuery.getContextValue("priority"));
Assert.assertEquals(1, serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY));
Assert.assertEquals(true, serdeQuery.getContextValue("useCache"));
Assert.assertEquals("true", serdeQuery.getContextValue("populateCache"));
Assert.assertEquals(true, serdeQuery.getContextValue("finalize"));

View File

@ -32,7 +32,7 @@ import io.druid.collections.StupidPool;
import io.druid.data.input.Row;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryContexts;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
@ -43,7 +43,6 @@ import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.strategy.GroupByStrategySelector;
import io.druid.query.groupby.strategy.GroupByStrategyV1;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
import org.bouncycastle.util.Integers;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -230,7 +229,7 @@ public class GroupByQueryMergeBufferTest
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500))
.build();
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
@ -258,7 +257,7 @@ public class GroupByQueryMergeBufferTest
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500))
.build();
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
@ -297,7 +296,7 @@ public class GroupByQueryMergeBufferTest
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500))
.build();
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
@ -349,7 +348,7 @@ public class GroupByQueryMergeBufferTest
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500))
.build();
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);

View File

@ -33,7 +33,7 @@ import io.druid.data.input.Row;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.InsufficientResourcesException;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryContexts;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
@ -46,7 +46,6 @@ import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.strategy.GroupByStrategySelector;
import io.druid.query.groupby.strategy.GroupByStrategyV1;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
import org.bouncycastle.util.Integers;
import org.hamcrest.CoreMatchers;
import org.junit.Rule;
import org.junit.Test;
@ -201,7 +200,7 @@ public class GroupByQueryRunnerFailureTest
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500))
.build();
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
@ -239,7 +238,7 @@ public class GroupByQueryRunnerFailureTest
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500))
.build();
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
@ -265,7 +264,7 @@ public class GroupByQueryRunnerFailureTest
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500))
.build();
try {

View File

@ -52,6 +52,7 @@ import io.druid.query.DruidProcessingConfig;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
@ -1213,7 +1214,7 @@ public class GroupByQueryRunnerTest
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.setContext(ImmutableMap.<String, Object>of("timeout", Integer.valueOf(60000)))
.setContext(ImmutableMap.<String, Object>of(QueryContexts.TIMEOUT_KEY, 60000))
.build();
List<Row> expectedResults = Arrays.asList(
@ -5421,7 +5422,7 @@ public class GroupByQueryRunnerTest
.setDimensions(Lists.<DimensionSpec>newArrayList())
.setAggregatorSpecs(ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("count")))
.setGranularity(QueryRunnerTestHelper.allGran)
.setContext(ImmutableMap.<String, Object>of("timeout", 10000))
.setContext(ImmutableMap.<String, Object>of(QueryContexts.TIMEOUT_KEY, 10000))
.build();
List<Row> expectedResults = Arrays.asList(

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import org.junit.Assert;
import org.junit.Test;
@ -78,7 +79,7 @@ public class TimeBoundaryQueryTest
);
Assert.assertEquals(new Integer(1), serdeQuery.getContextValue("priority"));
Assert.assertEquals(new Integer(1), serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY));
Assert.assertEquals(true, serdeQuery.getContextValue("useCache"));
Assert.assertEquals(true, serdeQuery.getContextValue("populateCache"));
Assert.assertEquals(true, serdeQuery.getContextValue("finalize"));
@ -116,7 +117,7 @@ public class TimeBoundaryQueryTest
);
Assert.assertEquals("1", serdeQuery.getContextValue("priority"));
Assert.assertEquals("1", serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY));
Assert.assertEquals("true", serdeQuery.getContextValue("useCache"));
Assert.assertEquals("true", serdeQuery.getContextValue("populateCache"));
Assert.assertEquals("true", serdeQuery.getContextValue("finalize"));

View File

@ -24,9 +24,9 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
@ -123,7 +123,7 @@ public class CacheUtil
CacheConfig cacheConfig
)
{
return BaseQuery.getContextUseCache(query, true)
return QueryContexts.isUseCache(query)
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
@ -135,7 +135,7 @@ public class CacheUtil
CacheConfig cacheConfig
)
{
return BaseQuery.getContextPopulateCache(query, true)
return QueryContexts.isPopulateCache(query)
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);

View File

@ -55,10 +55,10 @@ import io.druid.java.util.common.guava.LazySequence;
import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.BaseQuery;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
@ -151,12 +151,12 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final boolean useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig);
final boolean populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig);
final boolean isBySegment = BaseQuery.getContextBySegment(query, false);
final boolean isBySegment = QueryContexts.isBySegment(query);
final ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
final int priority = BaseQuery.getContextPriority(query, 0);
contextBuilder.put("priority", priority);
final int priority = QueryContexts.getPriority(query);
contextBuilder.put(QueryContexts.PRIORITY_KEY, priority);
if (populateCache) {
// prevent down-stream nodes from caching results as well if we are populating the cache
@ -177,7 +177,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// Note that enabling this leads to putting uncovered intervals information in the response headers
// and might blow up in some cases https://github.com/druid-io/druid/issues/2108
int uncoveredIntervalsLimit = BaseQuery.getContextUncoveredIntervalsLimit(query, 0);
int uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query);
if (uncoveredIntervalsLimit > 0) {
List<Interval> uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit);

View File

@ -50,9 +50,9 @@ import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.BaseQuery;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryRunner;
@ -134,7 +134,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
boolean isBySegment = BaseQuery.getContextBySegment(query, false);
boolean isBySegment = QueryContexts.isBySegment(query);
Pair<JavaType, JavaType> types = typesMap.get(query.getClass());
if (types == null) {

View File

@ -40,7 +40,6 @@ import io.druid.java.util.common.guava.Yielders;
import io.druid.query.DruidMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryMetrics;
import io.druid.query.QuerySegmentWalker;
@ -177,7 +176,7 @@ public class QueryResource implements QueryCountStatsProvider
) throws IOException
{
final long startNs = System.nanoTime();
Query query = null;
Query<?> query = null;
QueryToolChest toolChest = null;
String queryId = null;
@ -191,14 +190,8 @@ public class QueryResource implements QueryCountStatsProvider
queryId = UUID.randomUUID().toString();
query = query.withId(queryId);
}
if (query.getContextValue(QueryContextKeys.TIMEOUT) == null) {
query = query.withOverriddenContext(
ImmutableMap.of(
QueryContextKeys.TIMEOUT,
config.getMaxIdleTime().toStandardDuration().getMillis()
)
);
}
query = query.withDefaultTimeout(config.getDefaultQueryTimeout());
toolChest = warehouse.getToolChest(query);
Thread.currentThread()

View File

@ -37,6 +37,10 @@ public class ServerConfig
@NotNull
private Period maxIdleTime = new Period("PT5m");
@JsonProperty
@Min(0)
private long defaultQueryTimeout = 300_000; // 5 minutes
public int getNumThreads()
{
return numThreads;
@ -47,12 +51,18 @@ public class ServerConfig
return maxIdleTime;
}
public long getDefaultQueryTimeout()
{
return defaultQueryTimeout;
}
@Override
public String toString()
{
return "ServerConfig{" +
"numThreads=" + numThreads +
", maxIdleTime=" + maxIdleTime +
'}';
"numThreads=" + numThreads +
", maxIdleTime=" + maxIdleTime +
", defaultQueryTimeout=" + defaultQueryTimeout +
'}';
}
}

View File

@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import io.druid.query.BaseQuery;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
/**
*/
@ -46,7 +46,7 @@ public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelecto
@Override
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
{
final int priority = BaseQuery.getContextPriority(query, 0);
final int priority = QueryContexts.getPriority(query);
if (priority < minPriority) {
return Optional.of(