From c1538f29fcd049abde8f4acf0954ce3a80b3585e Mon Sep 17 00:00:00 2001 From: Akash Dwivedi Date: Mon, 13 Nov 2017 17:23:11 -0800 Subject: [PATCH] maxQueryTimeout property in runtime properties. (#4852) * maxQueryTimeout property in runtime properties. * extra line * move withTimeoutAndMaxScatterGatherBytes method to QueryLifeCycle. * Fix initialize method. * remove unused import. * doc update. * some more details in doc about query failure.. * minor fix. * decorating QueryRunner to set and verify context. Added by servers. * remove whitespace. --- docs/content/configuration/broker.md | 1 + docs/content/configuration/historical.md | 1 + .../overlord/ThreadPoolTaskRunner.java | 12 ++- .../indexing/overlord/TaskLifecycleTest.java | 4 +- .../worker/WorkerTaskMonitorTest.java | 4 +- .../java/io/druid/query/QueryContexts.java | 17 +++++ .../io/druid/query/QueryContextsTest.java | 35 +++++++++ .../io/druid/client/DirectDruidClient.java | 15 ---- .../server/ClientQuerySegmentWalker.java | 17 +++-- .../java/io/druid/server/QueryLifecycle.java | 13 +--- .../druid/server/QueryLifecycleFactory.java | 5 -- .../SetAndVerifyContextQueryRunner.java | 73 ++++++++++++++++++ .../server/coordination/ServerManager.java | 74 ++++++++++--------- .../server/initialization/ServerConfig.java | 15 +++- .../io/druid/server/QueryResourceTest.java | 21 ------ .../coordination/ServerManagerTest.java | 4 +- .../druid/sql/calcite/util/CalciteTests.java | 2 - 17 files changed, 214 insertions(+), 99 deletions(-) create mode 100644 server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 9989c76a80c..1b3d51184f6 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -44,6 +44,7 @@ Druid uses Jetty to serve HTTP requests. |`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20| |`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip| |`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M| +|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE| #### Retry Policy diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index 6734386989d..04390a3f468 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -52,6 +52,7 @@ Druid uses Jetty to serve HTTP requests. |`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m| |`druid.server.http.enableRequestLimit`|If enabled, no requests would be queued in jetty queue and "HTTP 429 Too Many Requests" error response would be sent. |false| |`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000| +|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE| #### Processing diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index c3cf174cb97..c80925556ce 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -52,6 +52,8 @@ import io.druid.query.QueryRunner; import io.druid.query.QuerySegmentWalker; import io.druid.query.SegmentDescriptor; import io.druid.server.DruidNode; +import io.druid.server.SetAndVerifyContextQueryRunner; +import io.druid.server.initialization.ServerConfig; import org.joda.time.Interval; import java.util.Collection; @@ -83,6 +85,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); private final ServiceEmitter emitter; private final TaskLocation location; + private final ServerConfig serverConfig; private volatile boolean stopping = false; @@ -91,13 +94,15 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker TaskToolboxFactory toolboxFactory, TaskConfig taskConfig, ServiceEmitter emitter, - @Self DruidNode node + @Self DruidNode node, + ServerConfig serverConfig ) { this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory"); this.taskConfig = taskConfig; this.emitter = Preconditions.checkNotNull(emitter, "emitter"); this.location = TaskLocation.create(node.getHost(), node.getPlaintextPort(), node.getTlsPort()); + this.serverConfig = serverConfig; } @Override @@ -362,7 +367,10 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker } } - return queryRunner == null ? new NoopQueryRunner() : queryRunner; + return new SetAndVerifyContextQueryRunner( + serverConfig, + queryRunner == null ? new NoopQueryRunner() : queryRunner + ); } private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index a15306e2398..23cad066320 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -112,6 +112,7 @@ import io.druid.server.DruidNode; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.server.coordination.ServerType; +import io.druid.server.initialization.ServerConfig; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; @@ -622,7 +623,8 @@ public class TaskLifecycleTest tb, taskConfig, emitter, - new DruidNode("dummy", "dummy", 10000, null, true, false) + new DruidNode("dummy", "dummy", 10000, null, true, false), + new ServerConfig() ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 4d675fe305d..eae1e4d6363 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -47,6 +47,7 @@ import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.DruidNode; import io.druid.server.initialization.IndexerZkConfig; +import io.druid.server.initialization.ServerConfig; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; import org.apache.curator.framework.CuratorFramework; @@ -194,7 +195,8 @@ public class WorkerTaskMonitorTest ), taskConfig, new NoopServiceEmitter(), - DUMMY_NODE + DUMMY_NODE, + new ServerConfig() ) ); } diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index b56812d8b0e..d0a16fd8784 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -132,6 +132,23 @@ public class QueryContexts } } + public static Query verifyMaxQueryTimeout(Query query, long maxQueryTimeout) + { + long timeout = getTimeout(query); + if (timeout > maxQueryTimeout) { + throw new IAE( + "configured [%s = %s] is more than enforced limit of maxQueryTimeout [%s].", + TIMEOUT_KEY, + timeout, + maxQueryTimeout + ); + } else { + return query; + } + } + + + public static long getMaxScatterGatherBytes(Query query) { return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE); diff --git a/processing/src/test/java/io/druid/query/QueryContextsTest.java b/processing/src/test/java/io/druid/query/QueryContextsTest.java index 32050ec9261..1358eada265 100644 --- a/processing/src/test/java/io/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/io/druid/query/QueryContextsTest.java @@ -21,15 +21,20 @@ package io.druid.query; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.druid.java.util.common.IAE; import io.druid.java.util.common.Intervals; import io.druid.query.spec.MultipleIntervalSegmentSpec; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.util.HashMap; public class QueryContextsTest { + @Rule + public final ExpectedException exception = ExpectedException.none(); @Test public void testDefaultQueryTimeout() @@ -72,4 +77,34 @@ public class QueryContextsTest query = QueryContexts.withDefaultTimeout(query, 1_000_000); Assert.assertEquals(1000, QueryContexts.getTimeout(query)); } + + @Test + public void testQueryMaxTimeout() + { + exception.expect(IAE.class); + exception.expectMessage("configured [timeout = 1000] is more than enforced limit of maxQueryTimeout [100]."); + Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))), + false, + ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1000) + ); + + QueryContexts.verifyMaxQueryTimeout(query, 100); + } + + @Test + public void testMaxScatterGatherBytes() + { + exception.expect(IAE.class); + exception.expectMessage("configured [maxScatterGatherBytes = 1000] is more than enforced limit of [100]."); + Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))), + false, + ImmutableMap.of(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 1000) + ); + + QueryContexts.withMaxScatterGatherBytes(query, 100); + } } diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 16d01a6be6b..cd1705dbf3d 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -63,7 +63,6 @@ import io.druid.query.QueryWatcher; import io.druid.query.ResourceLimitExceededException; import io.druid.query.Result; import io.druid.query.aggregation.MetricManipulatorFns; -import io.druid.server.initialization.ServerConfig; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.handler.codec.http.HttpChunk; @@ -116,20 +115,6 @@ public class DirectDruidClient implements QueryRunner private final AtomicInteger openConnections; private final boolean isSmile; - public static > QueryType withDefaultTimeoutAndMaxScatterGatherBytes( - final QueryType query, - ServerConfig serverConfig - ) - { - return (QueryType) QueryContexts.withMaxScatterGatherBytes( - QueryContexts.withDefaultTimeout( - (Query) query, - serverConfig.getDefaultQueryTimeout() - ), - serverConfig.getMaxScatterGatherBytes() - ); - } - /** * Removes the magical fields added by {@link #makeResponseContextForQuery(Query, long)}. */ diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 40a5fcbe25c..993747b7a3e 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -34,6 +34,7 @@ import io.druid.query.QueryToolChestWarehouse; import io.druid.query.RetryQueryRunner; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.SegmentDescriptor; +import io.druid.server.initialization.ServerConfig; import org.joda.time.Interval; /** @@ -45,6 +46,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private final QueryToolChestWarehouse warehouse; private final RetryQueryRunnerConfig retryConfig; private final ObjectMapper objectMapper; + private final ServerConfig serverConfig; @Inject public ClientQuerySegmentWalker( @@ -52,7 +54,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker CachingClusteredClient baseClient, QueryToolChestWarehouse warehouse, RetryQueryRunnerConfig retryConfig, - ObjectMapper objectMapper + ObjectMapper objectMapper, + ServerConfig serverConfig ) { this.emitter = emitter; @@ -60,6 +63,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker this.warehouse = warehouse; this.retryConfig = retryConfig; this.objectMapper = objectMapper; + this.serverConfig = serverConfig; } @Override @@ -86,10 +90,13 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker return new FluentQueryRunnerBuilder<>(toolChest) .create( - new RetryQueryRunner<>( - baseClientRunner, - retryConfig, - objectMapper + new SetAndVerifyContextQueryRunner( + serverConfig, + new RetryQueryRunner<>( + baseClientRunner, + retryConfig, + objectMapper + ) ) ) .applyPreMergeDecoration() diff --git a/server/src/main/java/io/druid/server/QueryLifecycle.java b/server/src/main/java/io/druid/server/QueryLifecycle.java index e4364c5d56f..1f25f370e27 100644 --- a/server/src/main/java/io/druid/server/QueryLifecycle.java +++ b/server/src/main/java/io/druid/server/QueryLifecycle.java @@ -38,7 +38,6 @@ import io.druid.query.QueryPlus; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; -import io.druid.server.initialization.ServerConfig; import io.druid.server.log.RequestLogger; import io.druid.server.security.Access; import io.druid.server.security.AuthenticationResult; @@ -58,7 +57,7 @@ import java.util.concurrent.TimeUnit; * *
    *
  1. Initialization ({@link #initialize(Query)})
  2. - *
  3. Authorization ({@link #authorize(String, String, HttpServletRequest)}
  4. + *
  5. Authorization ({@link #authorize(HttpServletRequest)}
  6. *
  7. Execution ({@link #execute()}
  8. *
  9. Logging ({@link #emitLogsAndMetrics(Throwable, String, long)}
  10. *
@@ -74,7 +73,6 @@ public class QueryLifecycle private final GenericQueryMetricsFactory queryMetricsFactory; private final ServiceEmitter emitter; private final RequestLogger requestLogger; - private final ServerConfig serverConfig; private final AuthorizerMapper authorizerMapper; private final long startMs; private final long startNs; @@ -90,7 +88,6 @@ public class QueryLifecycle final GenericQueryMetricsFactory queryMetricsFactory, final ServiceEmitter emitter, final RequestLogger requestLogger, - final ServerConfig serverConfig, final AuthorizerMapper authorizerMapper, final long startMs, final long startNs @@ -101,7 +98,6 @@ public class QueryLifecycle this.queryMetricsFactory = queryMetricsFactory; this.emitter = emitter; this.requestLogger = requestLogger; - this.serverConfig = serverConfig; this.authorizerMapper = authorizerMapper; this.startMs = startMs; this.startNs = startNs; @@ -171,12 +167,7 @@ public class QueryLifecycle queryId = UUID.randomUUID().toString(); } - this.queryPlus = QueryPlus.wrap( - (Query) DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( - baseQuery.withId(queryId), - serverConfig - ) - ); + this.queryPlus = QueryPlus.wrap(baseQuery.withId(queryId)); this.toolChest = warehouse.getToolChest(baseQuery); } diff --git a/server/src/main/java/io/druid/server/QueryLifecycleFactory.java b/server/src/main/java/io/druid/server/QueryLifecycleFactory.java index 745d23bb5c9..9b8b1c5f147 100644 --- a/server/src/main/java/io/druid/server/QueryLifecycleFactory.java +++ b/server/src/main/java/io/druid/server/QueryLifecycleFactory.java @@ -25,7 +25,6 @@ import io.druid.guice.LazySingleton; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChestWarehouse; -import io.druid.server.initialization.ServerConfig; import io.druid.server.log.RequestLogger; import io.druid.server.security.AuthConfig; import io.druid.server.security.AuthorizerMapper; @@ -38,7 +37,6 @@ public class QueryLifecycleFactory private final GenericQueryMetricsFactory queryMetricsFactory; private final ServiceEmitter emitter; private final RequestLogger requestLogger; - private final ServerConfig serverConfig; private final AuthorizerMapper authorizerMapper; @Inject @@ -48,7 +46,6 @@ public class QueryLifecycleFactory final GenericQueryMetricsFactory queryMetricsFactory, final ServiceEmitter emitter, final RequestLogger requestLogger, - final ServerConfig serverConfig, final AuthConfig authConfig, final AuthorizerMapper authorizerMapper ) @@ -58,7 +55,6 @@ public class QueryLifecycleFactory this.queryMetricsFactory = queryMetricsFactory; this.emitter = emitter; this.requestLogger = requestLogger; - this.serverConfig = serverConfig; this.authorizerMapper = authorizerMapper; } @@ -70,7 +66,6 @@ public class QueryLifecycleFactory queryMetricsFactory, emitter, requestLogger, - serverConfig, authorizerMapper, System.currentTimeMillis(), System.nanoTime() diff --git a/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java b/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java new file mode 100644 index 00000000000..637b9dd14fb --- /dev/null +++ b/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java @@ -0,0 +1,73 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import io.druid.java.util.common.guava.Sequence; +import io.druid.query.Query; +import io.druid.query.QueryContexts; +import io.druid.query.QueryPlus; +import io.druid.query.QueryRunner; +import io.druid.server.initialization.ServerConfig; + +import java.util.Map; + +/** + * Use this QueryRunner to set and verify Query contexts. + */ +public class SetAndVerifyContextQueryRunner implements QueryRunner +{ + private final ServerConfig serverConfig; + private final QueryRunner baseRunner; + + public SetAndVerifyContextQueryRunner(ServerConfig serverConfig, QueryRunner baseRunner) + { + this.serverConfig = serverConfig; + this.baseRunner = baseRunner; + } + + @Override + public Sequence run(QueryPlus queryPlus, Map responseContext) + { + return baseRunner.run( + QueryPlus.wrap((Query) withTimeoutAndMaxScatterGatherBytes( + queryPlus.getQuery(), + serverConfig + )), + responseContext + ); + } + + public static > QueryType withTimeoutAndMaxScatterGatherBytes( + final QueryType query, + ServerConfig serverConfig + ) + { + return (QueryType) QueryContexts.verifyMaxQueryTimeout( + QueryContexts.withMaxScatterGatherBytes( + QueryContexts.withDefaultTimeout( + (Query) query, + Math.min(serverConfig.getDefaultQueryTimeout(), serverConfig.getMaxQueryTimeout()) + ), + serverConfig.getMaxScatterGatherBytes() + ), + serverConfig.getMaxQueryTimeout() + ); + } +} diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 9b906325cba..39da89d03fd 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -54,6 +54,8 @@ import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.ReferenceCountingSegment; import io.druid.server.SegmentManager; +import io.druid.server.SetAndVerifyContextQueryRunner; +import io.druid.server.initialization.ServerConfig; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; @@ -79,6 +81,7 @@ public class ServerManager implements QuerySegmentWalker private final ObjectMapper objectMapper; private final CacheConfig cacheConfig; private final SegmentManager segmentManager; + private final ServerConfig serverConfig; @Inject public ServerManager( @@ -89,7 +92,8 @@ public class ServerManager implements QuerySegmentWalker @Smile ObjectMapper objectMapper, Cache cache, CacheConfig cacheConfig, - SegmentManager segmentManager + SegmentManager segmentManager, + ServerConfig serverConfig ) { this.conglomerate = conglomerate; @@ -102,6 +106,7 @@ public class ServerManager implements QuerySegmentWalker this.cacheConfig = cacheConfig; this.segmentManager = segmentManager; + this.serverConfig = serverConfig; } @Override @@ -275,40 +280,43 @@ public class ServerManager implements QuerySegmentWalker { SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor); String segmentId = adapter.getIdentifier(); - return CPUTimeMetricQueryRunner.safeBuild( - new SpecificSegmentQueryRunner( - new MetricsEmittingQueryRunner( - emitter, - toolChest, - new BySegmentQueryRunner( - segmentId, - adapter.getDataInterval().getStart(), - new CachingQueryRunner( + return new SetAndVerifyContextQueryRunner( + serverConfig, + CPUTimeMetricQueryRunner.safeBuild( + new SpecificSegmentQueryRunner( + new MetricsEmittingQueryRunner( + emitter, + toolChest, + new BySegmentQueryRunner( segmentId, - segmentDescriptor, - objectMapper, - cache, - toolChest, - new MetricsEmittingQueryRunner( - emitter, + adapter.getDataInterval().getStart(), + new CachingQueryRunner( + segmentId, + segmentDescriptor, + objectMapper, + cache, toolChest, - new ReferenceCountingSegmentQueryRunner(factory, adapter, segmentDescriptor), - QueryMetrics::reportSegmentTime, - queryMetrics -> queryMetrics.segment(segmentId) - ), - cachingExec, - cacheConfig - ) - ), - QueryMetrics::reportSegmentAndCacheTime, - queryMetrics -> queryMetrics.segment(segmentId) - ).withWaitMeasuredFromNow(), - segmentSpec - ), - toolChest, - emitter, - cpuTimeAccumulator, - false + new MetricsEmittingQueryRunner( + emitter, + toolChest, + new ReferenceCountingSegmentQueryRunner(factory, adapter, segmentDescriptor), + QueryMetrics::reportSegmentTime, + queryMetrics -> queryMetrics.segment(segmentId) + ), + cachingExec, + cacheConfig + ) + ), + QueryMetrics::reportSegmentAndCacheTime, + queryMetrics -> queryMetrics.segment(segmentId) + ).withWaitMeasuredFromNow(), + segmentSpec + ), + toolChest, + emitter, + cpuTimeAccumulator, + false + ) ); } } diff --git a/server/src/main/java/io/druid/server/initialization/ServerConfig.java b/server/src/main/java/io/druid/server/initialization/ServerConfig.java index d5db2cd2c53..c64a82b002d 100644 --- a/server/src/main/java/io/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ServerConfig.java @@ -53,6 +53,10 @@ public class ServerConfig @Min(1) private long maxScatterGatherBytes = Long.MAX_VALUE; + @JsonProperty + @Min(1) + private long maxQueryTimeout = Long.MAX_VALUE; + public int getNumThreads() { return numThreads; @@ -83,6 +87,11 @@ public class ServerConfig return maxScatterGatherBytes; } + public long getMaxQueryTimeout() + { + return maxQueryTimeout; + } + @Override public boolean equals(Object o) { @@ -98,7 +107,8 @@ public class ServerConfig enableRequestLimit == that.enableRequestLimit && defaultQueryTimeout == that.defaultQueryTimeout && maxScatterGatherBytes == that.maxScatterGatherBytes && - Objects.equals(maxIdleTime, that.maxIdleTime); + Objects.equals(maxIdleTime, that.maxIdleTime) && + maxQueryTimeout == that.maxQueryTimeout; } @Override @@ -110,7 +120,8 @@ public class ServerConfig enableRequestLimit, maxIdleTime, defaultQueryTimeout, - maxScatterGatherBytes + maxScatterGatherBytes, + maxQueryTimeout ); } } diff --git a/server/src/test/java/io/druid/server/QueryResourceTest.java b/server/src/test/java/io/druid/server/QueryResourceTest.java index 8a3c6147e2d..15742dc786e 100644 --- a/server/src/test/java/io/druid/server/QueryResourceTest.java +++ b/server/src/test/java/io/druid/server/QueryResourceTest.java @@ -42,7 +42,6 @@ import io.druid.query.QueryToolChestWarehouse; import io.druid.query.Result; import io.druid.query.SegmentDescriptor; import io.druid.query.timeboundary.TimeBoundaryResultValue; -import io.druid.server.initialization.ServerConfig; import io.druid.server.log.TestRequestLogger; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.server.security.Access; @@ -56,7 +55,6 @@ import io.druid.server.security.ForbiddenException; import io.druid.server.security.Resource; import org.easymock.EasyMock; import org.joda.time.Interval; -import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -84,21 +82,6 @@ public class QueryResourceTest private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); private static final AuthenticationResult authenticationResult = new AuthenticationResult("druid", "druid", null); - - public static final ServerConfig serverConfig = new ServerConfig() - { - @Override - public int getNumThreads() - { - return 1; - } - - @Override - public Period getMaxIdleTime() - { - return Period.seconds(1); - } - }; private final HttpServletRequest testServletRequest = EasyMock.createMock(HttpServletRequest.class); public static final QuerySegmentWalker testSegmentWalker = new QuerySegmentWalker() { @@ -154,7 +137,6 @@ public class QueryResourceTest new DefaultGenericQueryMetricsFactory(jsonMapper), new NoopServiceEmitter(), testRequestLogger, - serverConfig, new AuthConfig(), AuthTestUtils.TEST_AUTHORIZER_MAPPER ), @@ -265,7 +247,6 @@ public class QueryResourceTest new DefaultGenericQueryMetricsFactory(jsonMapper), new NoopServiceEmitter(), testRequestLogger, - serverConfig, new AuthConfig(null, null, null), authMapper ), @@ -373,7 +354,6 @@ public class QueryResourceTest new DefaultGenericQueryMetricsFactory(jsonMapper), new NoopServiceEmitter(), testRequestLogger, - serverConfig, new AuthConfig(null, null, null), authMapper ), @@ -495,7 +475,6 @@ public class QueryResourceTest new DefaultGenericQueryMetricsFactory(jsonMapper), new NoopServiceEmitter(), testRequestLogger, - serverConfig, new AuthConfig(null, null, null), authMapper ), diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index e2fcbe53baf..e53e756f094 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -66,6 +66,7 @@ import io.druid.segment.StorageAdapter; import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoadingException; import io.druid.server.SegmentManager; +import io.druid.server.initialization.ServerConfig; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; @@ -155,7 +156,8 @@ public class ServerManagerTest new DefaultObjectMapper(), new LocalCacheProvider().get(), new CacheConfig(), - segmentManager + segmentManager, + new ServerConfig() ); loadQueryable("test", "1", Intervals.of("P1d/2011-04-01")); diff --git a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java index 6ae798e3246..df96dd8bde9 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java @@ -92,7 +92,6 @@ import io.druid.segment.QueryableIndex; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.server.QueryLifecycleFactory; -import io.druid.server.initialization.ServerConfig; import io.druid.server.log.NoopRequestLogger; import io.druid.server.security.Access; import io.druid.server.security.Action; @@ -402,7 +401,6 @@ public class CalciteTests new DefaultGenericQueryMetricsFactory(INJECTOR.getInstance(Key.get(ObjectMapper.class, Json.class))), new ServiceEmitter("dummy", "dummy", new NoopEmitter()), new NoopRequestLogger(), - new ServerConfig(), new AuthConfig(), TEST_AUTHORIZER_MAPPER );