diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java index 9a883da35cf..af48f53cf09 100644 --- a/common/src/main/java/io/druid/collections/BlockingPool.java +++ b/common/src/main/java/io/druid/collections/BlockingPool.java @@ -74,40 +74,57 @@ public class BlockingPool } /** - * Take a resource from the pool. + * Take a resource from the pool, waiting up to the + * specified wait time if necessary for an element to become available. * - * @param timeout maximum time to wait for a resource, in milliseconds. Negative means do not use a timeout. + * @param timeoutMs maximum time to wait for a resource, in milliseconds. * * @return a resource, or null if the timeout was reached */ - public ReferenceCountingResourceHolder take(final long timeout) + public ReferenceCountingResourceHolder take(final long timeoutMs) { + Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); checkInitialized(); - final T theObject; try { - if (timeout > -1) { - theObject = timeout > 0 ? poll(timeout) : poll(); - } else { - theObject = take(); - } - return theObject == null ? null : new ReferenceCountingResourceHolder<>( - theObject, - new Closeable() - { - @Override - public void close() throws IOException - { - offer(theObject); - } - } - ); + return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject()); } catch (InterruptedException e) { throw Throwables.propagate(e); } } - private T poll() + /** + * Take a resource from the pool, waiting if necessary until an element becomes available. + * + * @return a resource + */ + public ReferenceCountingResourceHolder take() + { + checkInitialized(); + try { + return wrapObject(takeObject()); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + private ReferenceCountingResourceHolder wrapObject(T theObject) + { + return theObject == null ? null : new ReferenceCountingResourceHolder<>( + theObject, + new Closeable() + { + @Override + public void close() throws IOException + { + offer(theObject); + } + } + ); + } + + private T pollObject() { final ReentrantLock lock = this.lock; lock.lock(); @@ -118,9 +135,9 @@ public class BlockingPool } } - private T poll(long timeout) throws InterruptedException + private T pollObject(long timeoutMs) throws InterruptedException { - long nanos = TIME_UNIT.toNanos(timeout); + long nanos = TIME_UNIT.toNanos(timeoutMs); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { @@ -136,7 +153,7 @@ public class BlockingPool } } - private T take() throws InterruptedException + private T takeObject() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); @@ -151,41 +168,60 @@ public class BlockingPool } /** - * Take a resource from the pool. + * Take resources from the pool, waiting up to the + * specified wait time if necessary for elements of the given number to become available. * * @param elementNum number of resources to take - * @param timeout maximum time to wait for resources, in milliseconds. Negative means do not use a timeout. + * @param timeoutMs maximum time to wait for resources, in milliseconds. * * @return a resource, or null if the timeout was reached */ - public ReferenceCountingResourceHolder> takeBatch(final int elementNum, final long timeout) + public ReferenceCountingResourceHolder> takeBatch(final int elementNum, final long timeoutMs) { + Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); checkInitialized(); - final List objects; try { - if (timeout > -1) { - objects = timeout > 0 ? pollBatch(elementNum, timeout) : pollBatch(elementNum); - } else { - objects = takeBatch(elementNum); - } - return objects == null ? null : new ReferenceCountingResourceHolder<>( - objects, - new Closeable() - { - @Override - public void close() throws IOException - { - offerBatch(objects); - } - } - ); + return wrapObjects(timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum)); } catch (InterruptedException e) { throw Throwables.propagate(e); } } - private List pollBatch(int elementNum) throws InterruptedException + /** + * Take resources from the pool, waiting if necessary until the elements of the given number become available. + * + * @param elementNum number of resources to take + * + * @return a resource + */ + public ReferenceCountingResourceHolder> takeBatch(final int elementNum) + { + checkInitialized(); + try { + return wrapObjects(takeObjects(elementNum)); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + private ReferenceCountingResourceHolder> wrapObjects(List theObjects) + { + return theObjects == null ? null : new ReferenceCountingResourceHolder<>( + theObjects, + new Closeable() + { + @Override + public void close() throws IOException + { + offerBatch(theObjects); + } + } + ); + } + + private List pollObjects(int elementNum) throws InterruptedException { final List list = Lists.newArrayListWithCapacity(elementNum); final ReentrantLock lock = this.lock; @@ -204,9 +240,9 @@ public class BlockingPool } } - private List pollBatch(int elementNum, long timeout) throws InterruptedException + private List pollObjects(int elementNum, long timeoutMs) throws InterruptedException { - long nanos = TIME_UNIT.toNanos(timeout); + long nanos = TIME_UNIT.toNanos(timeoutMs); final List list = Lists.newArrayListWithCapacity(elementNum); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); @@ -226,7 +262,7 @@ public class BlockingPool } } - private List takeBatch(int elementNum) throws InterruptedException + private List takeObjects(int elementNum) throws InterruptedException { final List list = Lists.newArrayListWithCapacity(elementNum); final ReentrantLock lock = this.lock; diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 676d7ac31a0..fa6a01b5647 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -36,6 +36,7 @@ Druid uses Jetty to serve HTTP requests. |--------|-----------|-------| |`druid.server.http.numThreads`|Number of threads for HTTP requests.|10| |`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m| +|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000| |`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20| |`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip| |`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M| diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index 7ad71ae9ea0..e8b8832a478 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -48,6 +48,7 @@ Druid uses Jetty to serve HTTP requests. |--------|-----------|-------| |`druid.server.http.numThreads`|Number of threads for HTTP requests.|10| |`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m| +|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000| #### Processing diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index 0c475217a72..b93ef4dd325 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -7,16 +7,16 @@ Query Context The query context is used for various query configuration parameters. The following parameters apply to all queries. -|property |default | description | -|-----------------|---------------------|----------------------| -|timeout | `0` (no timeout) | Query timeout in milliseconds, beyond which unfinished queries will be cancelled. | -|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| -|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | -|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration | -|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration | -|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | -|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | -|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. | +|property |default | description | +|-----------------|----------------------------------------|----------------------| +|timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](broker.html) | +|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| +|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | +|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration | +|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration | +|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | +|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | +|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. | In addition, some query types offer context parameters specific to that query type. diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index 139cc6d0ad0..57385f50c11 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -28,6 +28,7 @@ import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.ColumnSelectorPlus; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; @@ -65,6 +66,7 @@ public class ScanQueryEngine return Sequences.empty(); } } + final boolean hasTimeout = QueryContexts.hasTimeout(query); final Long timeoutAt = (long) responseContext.get(ScanQueryRunnerFactory.CTX_TIMEOUT_AT); final long start = System.currentTimeMillis(); final StorageAdapter adapter = segment.asStorageAdapter(); @@ -156,7 +158,7 @@ public class ScanQueryEngine @Override public ScanResultValue next() { - if (System.currentTimeMillis() >= timeoutAt) { + if (hasTimeout && System.currentTimeMillis() >= timeoutAt) { throw new QueryInterruptedException(new TimeoutException()); } long lastOffset = offset; @@ -173,10 +175,12 @@ public class ScanQueryEngine ScanQueryRunnerFactory.CTX_COUNT, (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset) ); - responseContext.put( - ScanQueryRunnerFactory.CTX_TIMEOUT_AT, - timeoutAt - (System.currentTimeMillis() - start) - ); + if (hasTimeout) { + responseContext.put( + ScanQueryRunnerFactory.CTX_TIMEOUT_AT, + timeoutAt - (System.currentTimeMillis() - start) + ); + } return new ScanResultValue(segmentId, allColumns, events); } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index 712249eac5b..fcfe11b4366 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -25,7 +25,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -36,6 +36,8 @@ import java.util.concurrent.ExecutorService; public class ScanQueryRunnerFactory implements QueryRunnerFactory { + // This variable indicates when a running query should be expired, + // and is effective only when 'timeout' of queryContext has a positive value. public static final String CTX_TIMEOUT_AT = "timeoutAt"; public static final String CTX_COUNT = "count"; private final ScanQueryQueryToolChest toolChest; @@ -71,9 +73,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory query, final Map responseContext ) { - final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); - final long timeoutAt = (queryTimeout == null || queryTimeout.longValue() == 0L) - ? JodaUtils.MAX_INSTANT : System.currentTimeMillis() + queryTimeout.longValue(); + // Note: this variable is effective only when queryContext has a timeout. + // See the comment of CTX_TIMEOUT_AT. + final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(query); responseContext.put(CTX_TIMEOUT_AT, timeoutAt); return Sequences.concat( Sequences.map( @@ -122,7 +124,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory implements QueryRunner @Override public Sequence run(final Query query, final Map responseContext) { - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query); final ListenableFuture> future = executor.submit(new AbstractPrioritizedCallable>(priority) { @Override @@ -68,11 +68,10 @@ public class AsyncQueryRunner implements QueryRunner public Sequence get() { try { - Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT); - if (timeout == null) { - return future.get(); + if (QueryContexts.hasTimeout(query)) { + return future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS); } else { - return future.get(timeout.longValue(), TimeUnit.MILLISECONDS); + return future.get(); } } catch (ExecutionException | InterruptedException | TimeoutException ex) { throw Throwables.propagate(ex); diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index 22d0fb1a4ba..edb1ca5bf32 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; -import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.query.spec.QuerySegmentSpec; import org.joda.time.Duration; @@ -37,66 +36,6 @@ import java.util.Map; */ public abstract class BaseQuery> implements Query { - public static int getContextPriority(Query query, int defaultValue) - { - return parseInt(query, "priority", defaultValue); - } - - public static boolean getContextBySegment(Query query, boolean defaultValue) - { - return parseBoolean(query, "bySegment", defaultValue); - } - - public static boolean getContextPopulateCache(Query query, boolean defaultValue) - { - return parseBoolean(query, "populateCache", defaultValue); - } - - public static boolean getContextUseCache(Query query, boolean defaultValue) - { - return parseBoolean(query, "useCache", defaultValue); - } - - public static boolean getContextFinalize(Query query, boolean defaultValue) - { - return parseBoolean(query, "finalize", defaultValue); - } - - public static int getContextUncoveredIntervalsLimit(Query query, int defaultValue) - { - return parseInt(query, "uncoveredIntervalsLimit", defaultValue); - } - - private static int parseInt(Query query, String key, int defaultValue) - { - Object val = query.getContextValue(key); - if (val == null) { - return defaultValue; - } - if (val instanceof String) { - return Integer.parseInt((String) val); - } else if (val instanceof Integer) { - return (int) val; - } else { - throw new ISE("Unknown type [%s]", val.getClass()); - } - } - - private static boolean parseBoolean(Query query, String key, boolean defaultValue) - { - Object val = query.getContextValue(key); - if (val == null) { - return defaultValue; - } - if (val instanceof String) { - return Boolean.parseBoolean((String) val); - } else if (val instanceof Boolean) { - return (boolean) val; - } else { - throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass()); - } - } - public static void checkInterrupted() { if (Thread.interrupted()) { @@ -203,7 +142,7 @@ public abstract class BaseQuery> implements Query @Override public boolean getContextBoolean(String key, boolean defaultValue) { - return parseBoolean(this, key, defaultValue); + return QueryContexts.parseBoolean(this, key, defaultValue); } protected Map computeOverridenContext(Map overrides) @@ -237,6 +176,12 @@ public abstract class BaseQuery> implements Query return withOverriddenContext(ImmutableMap.of(QUERYID, id)); } + @Override + public Query withDefaultTimeout(long defaultTimeout) + { + return withOverriddenContext(ImmutableMap.of(QueryContexts.DEFAULT_TIMEOUT_KEY, defaultTimeout)); + } + @Override public boolean equals(Object o) { diff --git a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java index a2a8a960b4f..e57b9471ce3 100644 --- a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java @@ -51,7 +51,7 @@ public class BySegmentQueryRunner implements QueryRunner @SuppressWarnings("unchecked") public Sequence run(final Query query, Map responseContext) { - if (BaseQuery.getContextBySegment(query, false)) { + if (QueryContexts.isBySegment(query)) { final Sequence baseSequence = base.run(query, responseContext); final List results = Sequences.toList(baseSequence, Lists.newArrayList()); return Sequences.simple( diff --git a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java index c49ced5aa46..373c2e3b119 100644 --- a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java @@ -40,7 +40,7 @@ public abstract class BySegmentSkippingQueryRunner implements QueryRunner @Override public Sequence run(Query query, Map responseContext) { - if (BaseQuery.getContextBySegment(query, false)) { + if (QueryContexts.isBySegment(query)) { return baseRunner.run(query, responseContext); } diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 45363cd3a5c..117d6436082 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -91,7 +91,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner @Override public Sequence run(final Query query, final Map responseContext) { - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query); final Ordering ordering = query.getResultOrdering(); return new BaseSequence>( @@ -152,12 +152,11 @@ public class ChainedExecutionQueryRunner implements QueryRunner queryWatcher.registerQuery(query, futures); try { - final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, (Number) null); return new MergeIterable<>( ordering.nullsFirst(), - timeout == null ? - futures.get() : - futures.get(timeout.longValue(), TimeUnit.MILLISECONDS) + QueryContexts.hasTimeout(query) ? + futures.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS) : + futures.get() ).iterator(); } catch (InterruptedException e) { diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index 1035a4ed751..7fe58ee06b9 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -49,8 +49,8 @@ public class FinalizeResultsQueryRunner implements QueryRunner @Override public Sequence run(final Query query, Map responseContext) { - final boolean isBySegment = BaseQuery.getContextBySegment(query, false); - final boolean shouldFinalize = BaseQuery.getContextFinalize(query, true); + final boolean isBySegment = QueryContexts.isBySegment(query); + final boolean shouldFinalize = QueryContexts.isFinalize(query, true); final Query queryToRun; final Function finalizerFn; diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index d416589f199..77775f295c6 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -90,8 +90,8 @@ public class GroupByMergedQueryRunner implements QueryRunner true ); final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); - final boolean bySegment = BaseQuery.getContextBySegment(query, false); - final int priority = BaseQuery.getContextPriority(query, 0); + final boolean bySegment = QueryContexts.isBySegment(query); + final int priority = QueryContexts.getPriority(query); ListenableFuture> futures = Futures.allAsList( Lists.newArrayList( @@ -178,11 +178,10 @@ public class GroupByMergedQueryRunner implements QueryRunner { try { queryWatcher.registerQuery(query, future); - final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, (Number) null); - if (timeout == null) { - future.get(); + if (QueryContexts.hasTimeout(query)) { + future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS); } else { - future.get(timeout.longValue(), TimeUnit.MILLISECONDS); + future.get(); } } catch (InterruptedException e) { diff --git a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java index a42ce69fde0..5e8b529b153 100644 --- a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java @@ -148,7 +148,7 @@ public class IntervalChunkingQueryRunner implements QueryRunner private Period getChunkPeriod(Query query) { - String p = query.getContextValue(QueryContextKeys.CHUNK_PERIOD, "P0D"); + final String p = QueryContexts.getChunkPeriod(query); return Period.parse(p); } } diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 9ad178161ea..cfbf6f2d340 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -99,4 +99,6 @@ public interface Query String getId(); Query withDataSource(DataSource dataSource); + + Query withDefaultTimeout(long defaultTimeout); } diff --git a/processing/src/main/java/io/druid/query/QueryContextKeys.java b/processing/src/main/java/io/druid/query/QueryContextKeys.java deleted file mode 100644 index 480dcd551f4..00000000000 --- a/processing/src/main/java/io/druid/query/QueryContextKeys.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.query; - -public class QueryContextKeys -{ - public static final String PRIORITY = "priority"; - public static final String TIMEOUT = "timeout"; - public static final String CHUNK_PERIOD = "chunkPeriod"; -} diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java new file mode 100644 index 00000000000..b59c6bc2a2b --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -0,0 +1,168 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.google.common.base.Preconditions; +import io.druid.java.util.common.ISE; + +public class QueryContexts +{ + public static final String PRIORITY_KEY = "priority"; + public static final String TIMEOUT_KEY = "timeout"; + public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout"; + public static final String CHUNK_PERIOD_KEY = "chunkPeriod"; + + public static final boolean DEFAULT_BY_SEGMENT = false; + public static final boolean DEFAULT_POPULATE_CACHE = true; + public static final boolean DEFAULT_USE_CACHE = true; + public static final int DEFAULT_PRIORITY = 0; + public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0; + public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes + public static final long NO_TIMEOUT = 0; + + public static boolean isBySegment(Query query) + { + return isBySegment(query, DEFAULT_BY_SEGMENT); + } + + public static boolean isBySegment(Query query, boolean defaultValue) + { + return parseBoolean(query, "bySegment", defaultValue); + } + + public static boolean isPopulateCache(Query query) + { + return isPopulateCache(query, DEFAULT_POPULATE_CACHE); + } + + public static boolean isPopulateCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "populateCache", defaultValue); + } + + public static boolean isUseCache(Query query) + { + return isUseCache(query, DEFAULT_USE_CACHE); + } + + public static boolean isUseCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "useCache", defaultValue); + } + + public static boolean isFinalize(Query query, boolean defaultValue) + { + return parseBoolean(query, "finalize", defaultValue); + } + + public static int getUncoveredIntervalsLimit(Query query) + { + return getUncoveredIntervalsLimit(query, DEFAULT_UNCOVERED_INTERVALS_LIMIT); + } + + public static int getUncoveredIntervalsLimit(Query query, int defaultValue) + { + return parseInt(query, "uncoveredIntervalsLimit", defaultValue); + } + + public static int getPriority(Query query) + { + return getPriority(query, DEFAULT_PRIORITY); + } + + public static int getPriority(Query query, int defaultValue) + { + return parseInt(query, PRIORITY_KEY, defaultValue); + } + + public static String getChunkPeriod(Query query) + { + return query.getContextValue(CHUNK_PERIOD_KEY, "P0D"); + } + + public static boolean hasTimeout(Query query) + { + return getTimeout(query) != NO_TIMEOUT; + } + + public static long getTimeout(Query query) + { + return getTimeout(query, getDefaultTimeout(query)); + } + + public static long getTimeout(Query query, long defaultTimeout) + { + final long timeout = parseLong(query, TIMEOUT_KEY, defaultTimeout); + Preconditions.checkState(timeout >= 0, "Timeout must be a non negative value, but was [%d]", timeout); + return timeout; + } + + static long getDefaultTimeout(Query query) + { + final long defaultTimeout = parseLong(query, DEFAULT_TIMEOUT_KEY, DEFAULT_TIMEOUT_MILLIS); + Preconditions.checkState(defaultTimeout >= 0, "Timeout must be a non negative value, but was [%d]", defaultTimeout); + return defaultTimeout; + } + + static long parseLong(Query query, String key, long defaultValue) + { + Object val = query.getContextValue(key); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Long.parseLong((String) val); + } else if (val instanceof Number) { + return ((Number) val).longValue(); + } else { + throw new ISE("Unknown type [%s]", val.getClass()); + } + } + + static int parseInt(Query query, String key, int defaultValue) + { + Object val = query.getContextValue(key); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Integer.parseInt((String) val); + } else if (val instanceof Number) { + return ((Number) val).intValue(); + } else { + throw new ISE("Unknown type [%s]", val.getClass()); + } + } + + static boolean parseBoolean(Query query, String key, boolean defaultValue) + { + Object val = query.getContextValue(key); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Boolean.parseBoolean((String) val); + } else if (val instanceof Boolean) { + return (boolean) val; + } else { + throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass()); + } + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 85f4cf559ad..ddf9c4277c3 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -38,11 +38,11 @@ import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.MappedSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; -import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.DataSource; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; @@ -115,7 +115,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest run(Query query, Map responseContext) { - if (BaseQuery.getContextBySegment(query, false)) { + if (QueryContexts.isBySegment(query)) { return runner.run(query, responseContext); } @@ -204,7 +204,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest finalizingResults; - if (GroupByQuery.getContextFinalize(subquery, false)) { + if (QueryContexts.isFinalize(subquery, false)) { finalizingResults = new MappedSequence<>( subqueryResult, makePreComputeManipulatorFn( diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index fb0fe1327b9..de6fee13b21 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -35,7 +35,6 @@ import com.google.common.util.concurrent.MoreExecutors; import io.druid.collections.BlockingPool; import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.collections.Releaser; -import io.druid.common.utils.JodaUtils; import io.druid.data.input.Row; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; @@ -45,10 +44,9 @@ import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.logger.Logger; import io.druid.query.AbstractPrioritizedCallable; -import io.druid.query.BaseQuery; import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; @@ -125,7 +123,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner ImmutableMap.of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true) ); - if (BaseQuery.getContextBySegment(query, false) || forceChainedExecution) { + if (QueryContexts.isBySegment(query) || forceChainedExecution) { return new ChainedExecutionQueryRunner(exec, queryWatcher, queryables).run(query, responseContext); } @@ -141,14 +139,13 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) ); - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query); // Figure out timeoutAt time now, so we can apply the timeout to both the mergeBufferPool.take and the actual // query processing together. - final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); - final long timeoutAt = queryTimeout == null - ? JodaUtils.MAX_INSTANT - : System.currentTimeMillis() + queryTimeout.longValue(); + final long queryTimeout = QueryContexts.getTimeout(query); + final boolean hasTimeout = QueryContexts.hasTimeout(query); + final long timeoutAt = System.currentTimeMillis() + queryTimeout; return new BaseSequence<>( new BaseSequence.IteratorMaker>() @@ -170,9 +167,13 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner final ReferenceCountingResourceHolder mergeBufferHolder; try { // This will potentially block if there are no merge buffers left in the pool. - final long timeout = timeoutAt - System.currentTimeMillis(); - if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { - throw new TimeoutException(); + if (hasTimeout) { + final long timeout = timeoutAt - System.currentTimeMillis(); + if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { + throw new TimeoutException(); + } + } else { + mergeBufferHolder = mergeBufferPool.take(); } resources.add(mergeBufferHolder); } @@ -248,6 +249,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner waitForFutureCompletion( query, Futures.allAsList(ImmutableList.of(future)), + hasTimeout, timeoutAt - System.currentTimeMillis() ); } @@ -260,7 +262,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner ); if (!isSingleThreaded) { - waitForFutureCompletion(query, futures, timeoutAt - System.currentTimeMillis()); + waitForFutureCompletion(query, futures, hasTimeout, timeoutAt - System.currentTimeMillis()); } return RowBasedGrouperHelper.makeGrouperIterator( @@ -299,6 +301,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner private void waitForFutureCompletion( GroupByQuery query, ListenableFuture> future, + boolean hasTimeout, long timeout ) { @@ -307,11 +310,11 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner queryWatcher.registerQuery(query, future); } - if (timeout <= 0) { + if (hasTimeout && timeout <= 0) { throw new TimeoutException(); } - final List results = future.get(timeout, TimeUnit.MILLISECONDS); + final List results = hasTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get(); for (AggregateResult result : results) { if (!result.isOk()) { diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 45dbd954d1c..e0e7273f572 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -31,7 +31,6 @@ import com.google.inject.Inject; import io.druid.collections.BlockingPool; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidPool; -import io.druid.common.utils.JodaUtils; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.guice.annotations.Global; @@ -47,7 +46,7 @@ import io.druid.query.DruidProcessingConfig; import io.druid.query.InsufficientResourcesException; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; @@ -141,10 +140,12 @@ public class GroupByStrategyV2 implements GroupByStrategy } else if (requiredMergeBufferNum == 0) { return new GroupByQueryResource(); } else { - final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, JodaUtils.MAX_INSTANT); - final ResourceHolder> mergeBufferHolders = mergeBufferPool.takeBatch( - requiredMergeBufferNum, timeout.longValue() - ); + final ResourceHolder> mergeBufferHolders; + if (QueryContexts.hasTimeout(query)) { + mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum, QueryContexts.getTimeout(query)); + } else { + mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum); + } if (mergeBufferHolders == null) { throw new InsufficientResourcesException("Cannot acquire enough merge buffers"); } else { diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 5620219f8b0..475d98453df 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -32,10 +32,9 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; import io.druid.query.AbstractPrioritizedCallable; -import io.druid.query.BaseQuery; import io.druid.query.ConcatQueryRunner; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; @@ -202,7 +201,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory responseContext ) { - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query); ListenableFuture> future = queryExecutor.submit( new AbstractPrioritizedCallable>(priority) { @@ -210,15 +209,18 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory call() throws Exception { return Sequences.simple( - Sequences.toList(input.run(query, responseContext), new ArrayList()) + Sequences.toList(input.run(query, responseContext), new ArrayList<>()) ); } } ); try { queryWatcher.registerQuery(query, future); - final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, (Number) null); - return timeout == null ? future.get() : future.get(timeout.longValue(), TimeUnit.MILLISECONDS); + if (QueryContexts.hasTimeout(query)) { + return future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS); + } else { + return future.get(); + } } catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index f5ab114f687..d84a7396e3e 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -35,12 +35,12 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.nary.BinaryFn; -import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; @@ -391,7 +391,7 @@ public class SearchQueryQueryToolChest extends QueryToolChestof(QueryContextKeys.TIMEOUT, 1)), + query.withOverriddenContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1)), Collections.EMPTY_MAP); try { diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index 67918aef481..311aaaa798f 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -251,7 +251,7 @@ public class ChainedExecutionQueryRunnerTest .dataSource("test") .intervals("2014/2015") .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))) - .context(ImmutableMap.of(QueryContextKeys.TIMEOUT, 100, "queryId", "test")) + .context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 100, "queryId", "test")) .build(), context ); diff --git a/processing/src/test/java/io/druid/query/QueryContextsTest.java b/processing/src/test/java/io/druid/query/QueryContextsTest.java new file mode 100644 index 00000000000..c656f077110 --- /dev/null +++ b/processing/src/test/java/io/druid/query/QueryContextsTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.query.filter.DimFilter; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class QueryContextsTest +{ + private static class TestQuery extends BaseQuery + { + + public TestQuery(DataSource dataSource, QuerySegmentSpec querySegmentSpec, boolean descending, Map context) + { + super(dataSource, querySegmentSpec, descending, context); + } + + @Override + public boolean hasFilters() + { + return false; + } + + @Override + public DimFilter getFilter() + { + return null; + } + + @Override + public String getType() + { + return null; + } + + @Override + public Query withQuerySegmentSpec(QuerySegmentSpec spec) + { + return null; + } + + @Override + public Query withDataSource(DataSource dataSource) + { + return null; + } + + @Override + public Query withOverriddenContext(Map contextOverride) + { + return new TestQuery( + getDataSource(), + getQuerySegmentSpec(), + isDescending(), + computeOverridenContext(contextOverride) + ); + } + } + + @Test + public void testDefaultQueryTimeout() + { + final Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))), + false, + new HashMap() + ); + Assert.assertEquals(300_000, QueryContexts.getDefaultTimeout(query)); + } + + @Test + public void testEmptyQueryTimeout() + { + Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))), + false, + new HashMap() + ); + Assert.assertEquals(300_000, QueryContexts.getTimeout(query)); + + query = query.withDefaultTimeout(60_000); + Assert.assertEquals(60_000, QueryContexts.getTimeout(query)); + } + + @Test + public void testQueryTimeout() + { + Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))), + false, + ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1000) + ); + Assert.assertEquals(1000, QueryContexts.getTimeout(query)); + + query = query.withDefaultTimeout(1_000_000); + Assert.assertEquals(1000, QueryContexts.getTimeout(query)); + } +} diff --git a/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java b/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java index e05caa4336d..f66d577ec12 100644 --- a/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java @@ -33,6 +33,7 @@ import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.Druids; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -101,7 +102,7 @@ public class DataSourceMetadataQueryTest ), Query.class ); - Assert.assertEquals(1, serdeQuery.getContextValue("priority")); + Assert.assertEquals(1, serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY)); Assert.assertEquals(true, serdeQuery.getContextValue("useCache")); Assert.assertEquals("true", serdeQuery.getContextValue("populateCache")); Assert.assertEquals(true, serdeQuery.getContextValue("finalize")); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java index 460c9d8c851..53e81977e29 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -32,7 +32,7 @@ import io.druid.collections.StupidPool; import io.druid.data.input.Row; import io.druid.java.util.common.granularity.Granularities; import io.druid.query.DruidProcessingConfig; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; @@ -43,7 +43,6 @@ import io.druid.query.dimension.DimensionSpec; import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; -import org.bouncycastle.util.Integers; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -230,7 +229,7 @@ public class GroupByQueryMergeBufferTest .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -258,7 +257,7 @@ public class GroupByQueryMergeBufferTest .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -297,7 +296,7 @@ public class GroupByQueryMergeBufferTest .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -349,7 +348,7 @@ public class GroupByQueryMergeBufferTest .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java index a2f6f6b8d77..060012bf0eb 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -33,7 +33,7 @@ import io.druid.data.input.Row; import io.druid.java.util.common.granularity.Granularities; import io.druid.query.DruidProcessingConfig; import io.druid.query.InsufficientResourcesException; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; @@ -46,7 +46,6 @@ import io.druid.query.dimension.DimensionSpec; import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; -import org.bouncycastle.util.Integers; import org.hamcrest.CoreMatchers; import org.junit.Rule; import org.junit.Test; @@ -201,7 +200,7 @@ public class GroupByQueryRunnerFailureTest .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -239,7 +238,7 @@ public class GroupByQueryRunnerFailureTest .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -265,7 +264,7 @@ public class GroupByQueryRunnerFailureTest .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); try { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 9d88c8378c6..dfcc6d2b098 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -52,6 +52,7 @@ import io.druid.query.DruidProcessingConfig; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; @@ -1213,7 +1214,7 @@ public class GroupByQueryRunnerTest ) ) .setGranularity(QueryRunnerTestHelper.dayGran) - .setContext(ImmutableMap.of("timeout", Integer.valueOf(60000))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 60000)) .build(); List expectedResults = Arrays.asList( @@ -5421,7 +5422,7 @@ public class GroupByQueryRunnerTest .setDimensions(Lists.newArrayList()) .setAggregatorSpecs(ImmutableList.of(new CountAggregatorFactory("count"))) .setGranularity(QueryRunnerTestHelper.allGran) - .setContext(ImmutableMap.of("timeout", 10000)) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 10000)) .build(); List expectedResults = Arrays.asList( diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java index 1c332fb23dc..cad7fe53119 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Druids; import io.druid.query.Query; +import io.druid.query.QueryContexts; import org.junit.Assert; import org.junit.Test; @@ -78,7 +79,7 @@ public class TimeBoundaryQueryTest ); - Assert.assertEquals(new Integer(1), serdeQuery.getContextValue("priority")); + Assert.assertEquals(new Integer(1), serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY)); Assert.assertEquals(true, serdeQuery.getContextValue("useCache")); Assert.assertEquals(true, serdeQuery.getContextValue("populateCache")); Assert.assertEquals(true, serdeQuery.getContextValue("finalize")); @@ -116,7 +117,7 @@ public class TimeBoundaryQueryTest ); - Assert.assertEquals("1", serdeQuery.getContextValue("priority")); + Assert.assertEquals("1", serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY)); Assert.assertEquals("true", serdeQuery.getContextValue("useCache")); Assert.assertEquals("true", serdeQuery.getContextValue("populateCache")); Assert.assertEquals("true", serdeQuery.getContextValue("finalize")); diff --git a/server/src/main/java/io/druid/client/CacheUtil.java b/server/src/main/java/io/druid/client/CacheUtil.java index 6210f2c5ee3..a5bce97fead 100644 --- a/server/src/main/java/io/druid/client/CacheUtil.java +++ b/server/src/main/java/io/druid/client/CacheUtil.java @@ -24,9 +24,9 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; -import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.SegmentDescriptor; import org.joda.time.Interval; @@ -123,7 +123,7 @@ public class CacheUtil CacheConfig cacheConfig ) { - return BaseQuery.getContextUseCache(query, true) + return QueryContexts.isUseCache(query) && strategy != null && cacheConfig.isUseCache() && cacheConfig.isQueryCacheable(query); @@ -135,7 +135,7 @@ public class CacheUtil CacheConfig cacheConfig ) { - return BaseQuery.getContextPopulateCache(query, true) + return QueryContexts.isPopulateCache(query) && strategy != null && cacheConfig.isPopulateCache() && cacheConfig.isQueryCacheable(query); diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index b15e912fd3f..c86123b0ec8 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -55,10 +55,10 @@ import io.druid.java.util.common.guava.LazySequence; import io.druid.java.util.common.guava.MergeSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; -import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValueClass; import io.druid.query.CacheStrategy; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -151,12 +151,12 @@ public class CachingClusteredClient implements QueryRunner final boolean useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig); final boolean populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig); - final boolean isBySegment = BaseQuery.getContextBySegment(query, false); + final boolean isBySegment = QueryContexts.isBySegment(query); final ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); - final int priority = BaseQuery.getContextPriority(query, 0); - contextBuilder.put("priority", priority); + final int priority = QueryContexts.getPriority(query); + contextBuilder.put(QueryContexts.PRIORITY_KEY, priority); if (populateCache) { // prevent down-stream nodes from caching results as well if we are populating the cache @@ -177,7 +177,7 @@ public class CachingClusteredClient implements QueryRunner // Note that enabling this leads to putting uncovered intervals information in the response headers // and might blow up in some cases https://github.com/druid-io/druid/issues/2108 - int uncoveredIntervalsLimit = BaseQuery.getContextUncoveredIntervalsLimit(query, 0); + int uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query); if (uncoveredIntervalsLimit > 0) { List uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit); diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index cd6401462a0..d8dc7882a74 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -50,9 +50,9 @@ import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; -import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValueClass; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; @@ -134,7 +134,7 @@ public class DirectDruidClient implements QueryRunner public Sequence run(final Query query, final Map context) { QueryToolChest> toolChest = warehouse.getToolChest(query); - boolean isBySegment = BaseQuery.getContextBySegment(query, false); + boolean isBySegment = QueryContexts.isBySegment(query); Pair types = typesMap.get(query.getClass()); if (types == null) { diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 609c7cd53bb..554cc34de83 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -40,7 +40,6 @@ import io.druid.java.util.common.guava.Yielders; import io.druid.query.DruidMetrics; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; import io.druid.query.QuerySegmentWalker; @@ -177,7 +176,7 @@ public class QueryResource implements QueryCountStatsProvider ) throws IOException { final long startNs = System.nanoTime(); - Query query = null; + Query query = null; QueryToolChest toolChest = null; String queryId = null; @@ -191,14 +190,8 @@ public class QueryResource implements QueryCountStatsProvider queryId = UUID.randomUUID().toString(); query = query.withId(queryId); } - if (query.getContextValue(QueryContextKeys.TIMEOUT) == null) { - query = query.withOverriddenContext( - ImmutableMap.of( - QueryContextKeys.TIMEOUT, - config.getMaxIdleTime().toStandardDuration().getMillis() - ) - ); - } + query = query.withDefaultTimeout(config.getDefaultQueryTimeout()); + toolChest = warehouse.getToolChest(query); Thread.currentThread() diff --git a/server/src/main/java/io/druid/server/initialization/ServerConfig.java b/server/src/main/java/io/druid/server/initialization/ServerConfig.java index cad5ace2c96..560975abe18 100644 --- a/server/src/main/java/io/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ServerConfig.java @@ -37,6 +37,10 @@ public class ServerConfig @NotNull private Period maxIdleTime = new Period("PT5m"); + @JsonProperty + @Min(0) + private long defaultQueryTimeout = 300_000; // 5 minutes + public int getNumThreads() { return numThreads; @@ -47,12 +51,18 @@ public class ServerConfig return maxIdleTime; } + public long getDefaultQueryTimeout() + { + return defaultQueryTimeout; + } + @Override public String toString() { return "ServerConfig{" + - "numThreads=" + numThreads + - ", maxIdleTime=" + maxIdleTime + - '}'; + "numThreads=" + numThreads + + ", maxIdleTime=" + maxIdleTime + + ", defaultQueryTimeout=" + defaultQueryTimeout + + '}'; } } diff --git a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java index 66164f91c70..c4af26e0b10 100644 --- a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java +++ b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java @@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.collect.Iterables; -import io.druid.query.BaseQuery; import io.druid.query.Query; +import io.druid.query.QueryContexts; /** */ @@ -46,7 +46,7 @@ public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelecto @Override public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) { - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query); if (priority < minPriority) { return Optional.of(