maxQueryTimeout property in runtime properties. (#4852)

* maxQueryTimeout property in runtime properties.

* extra line

* move withTimeoutAndMaxScatterGatherBytes method to QueryLifeCycle.

* Fix initialize method.

* remove unused import.

* doc update.

* some more details in doc about query failure..

* minor fix.

* decorating QueryRunner to set and verify context. Added by servers.

* remove whitespace.
This commit is contained in:
Akash Dwivedi 2017-11-13 17:23:11 -08:00 committed by Himanshu
parent 819700cbc5
commit c1538f29fc
17 changed files with 214 additions and 99 deletions

View File

@ -44,6 +44,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20| |`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20|
|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip| |`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip|
|`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M| |`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M|
|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE|
#### Retry Policy #### Retry Policy

View File

@ -52,6 +52,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m| |`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
|`druid.server.http.enableRequestLimit`|If enabled, no requests would be queued in jetty queue and "HTTP 429 Too Many Requests" error response would be sent. |false| |`druid.server.http.enableRequestLimit`|If enabled, no requests would be queued in jetty queue and "HTTP 429 Too Many Requests" error response would be sent. |false|
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000| |`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE|
#### Processing #### Processing

View File

@ -52,6 +52,8 @@ import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.SetAndVerifyContextQueryRunner;
import io.druid.server.initialization.ServerConfig;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.Collection; import java.util.Collection;
@ -83,6 +85,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final TaskLocation location; private final TaskLocation location;
private final ServerConfig serverConfig;
private volatile boolean stopping = false; private volatile boolean stopping = false;
@ -91,13 +94,15 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
TaskToolboxFactory toolboxFactory, TaskToolboxFactory toolboxFactory,
TaskConfig taskConfig, TaskConfig taskConfig,
ServiceEmitter emitter, ServiceEmitter emitter,
@Self DruidNode node @Self DruidNode node,
ServerConfig serverConfig
) )
{ {
this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory"); this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory");
this.taskConfig = taskConfig; this.taskConfig = taskConfig;
this.emitter = Preconditions.checkNotNull(emitter, "emitter"); this.emitter = Preconditions.checkNotNull(emitter, "emitter");
this.location = TaskLocation.create(node.getHost(), node.getPlaintextPort(), node.getTlsPort()); this.location = TaskLocation.create(node.getHost(), node.getPlaintextPort(), node.getTlsPort());
this.serverConfig = serverConfig;
} }
@Override @Override
@ -362,7 +367,10 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
} }
} }
return queryRunner == null ? new NoopQueryRunner<T>() : queryRunner; return new SetAndVerifyContextQueryRunner(
serverConfig,
queryRunner == null ? new NoopQueryRunner<T>() : queryRunner
);
} }
private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem

View File

@ -112,6 +112,7 @@ import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.server.coordination.ServerType; import io.druid.server.coordination.ServerType;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.metrics.NoopServiceEmitter; import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
@ -622,7 +623,8 @@ public class TaskLifecycleTest
tb, tb,
taskConfig, taskConfig,
emitter, emitter,
new DruidNode("dummy", "dummy", 10000, null, true, false) new DruidNode("dummy", "dummy", 10000, null, true, false),
new ServerConfig()
); );
} }

View File

@ -47,6 +47,7 @@ import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter; import io.druid.server.metrics.NoopServiceEmitter;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -194,7 +195,8 @@ public class WorkerTaskMonitorTest
), ),
taskConfig, taskConfig,
new NoopServiceEmitter(), new NoopServiceEmitter(),
DUMMY_NODE DUMMY_NODE,
new ServerConfig()
) )
); );
} }

View File

@ -132,6 +132,23 @@ public class QueryContexts
} }
} }
public static <T> Query<T> verifyMaxQueryTimeout(Query<T> query, long maxQueryTimeout)
{
long timeout = getTimeout(query);
if (timeout > maxQueryTimeout) {
throw new IAE(
"configured [%s = %s] is more than enforced limit of maxQueryTimeout [%s].",
TIMEOUT_KEY,
timeout,
maxQueryTimeout
);
} else {
return query;
}
}
public static <T> long getMaxScatterGatherBytes(Query<T> query) public static <T> long getMaxScatterGatherBytes(Query<T> query)
{ {
return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE); return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE);

View File

@ -21,15 +21,20 @@ package io.druid.query;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Intervals;
import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.spec.MultipleIntervalSegmentSpec;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.HashMap; import java.util.HashMap;
public class QueryContextsTest public class QueryContextsTest
{ {
@Rule
public final ExpectedException exception = ExpectedException.none();
@Test @Test
public void testDefaultQueryTimeout() public void testDefaultQueryTimeout()
@ -72,4 +77,34 @@ public class QueryContextsTest
query = QueryContexts.withDefaultTimeout(query, 1_000_000); query = QueryContexts.withDefaultTimeout(query, 1_000_000);
Assert.assertEquals(1000, QueryContexts.getTimeout(query)); Assert.assertEquals(1000, QueryContexts.getTimeout(query));
} }
@Test
public void testQueryMaxTimeout()
{
exception.expect(IAE.class);
exception.expectMessage("configured [timeout = 1000] is more than enforced limit of maxQueryTimeout [100].");
Query<?> query = new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
false,
ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1000)
);
QueryContexts.verifyMaxQueryTimeout(query, 100);
}
@Test
public void testMaxScatterGatherBytes()
{
exception.expect(IAE.class);
exception.expectMessage("configured [maxScatterGatherBytes = 1000] is more than enforced limit of [100].");
Query<?> query = new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
false,
ImmutableMap.of(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 1000)
);
QueryContexts.withMaxScatterGatherBytes(query, 100);
}
} }

View File

@ -63,7 +63,6 @@ import io.druid.query.QueryWatcher;
import io.druid.query.ResourceLimitExceededException; import io.druid.query.ResourceLimitExceededException;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.MetricManipulatorFns; import io.druid.query.aggregation.MetricManipulatorFns;
import io.druid.server.initialization.ServerConfig;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpChunk;
@ -116,20 +115,6 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private final AtomicInteger openConnections; private final AtomicInteger openConnections;
private final boolean isSmile; private final boolean isSmile;
public static <T, QueryType extends Query<T>> QueryType withDefaultTimeoutAndMaxScatterGatherBytes(
final QueryType query,
ServerConfig serverConfig
)
{
return (QueryType) QueryContexts.withMaxScatterGatherBytes(
QueryContexts.withDefaultTimeout(
(Query) query,
serverConfig.getDefaultQueryTimeout()
),
serverConfig.getMaxScatterGatherBytes()
);
}
/** /**
* Removes the magical fields added by {@link #makeResponseContextForQuery(Query, long)}. * Removes the magical fields added by {@link #makeResponseContextForQuery(Query, long)}.
*/ */

View File

@ -34,6 +34,7 @@ import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.RetryQueryRunner; import io.druid.query.RetryQueryRunner;
import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.RetryQueryRunnerConfig;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import io.druid.server.initialization.ServerConfig;
import org.joda.time.Interval; import org.joda.time.Interval;
/** /**
@ -45,6 +46,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private final QueryToolChestWarehouse warehouse; private final QueryToolChestWarehouse warehouse;
private final RetryQueryRunnerConfig retryConfig; private final RetryQueryRunnerConfig retryConfig;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final ServerConfig serverConfig;
@Inject @Inject
public ClientQuerySegmentWalker( public ClientQuerySegmentWalker(
@ -52,7 +54,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
CachingClusteredClient baseClient, CachingClusteredClient baseClient,
QueryToolChestWarehouse warehouse, QueryToolChestWarehouse warehouse,
RetryQueryRunnerConfig retryConfig, RetryQueryRunnerConfig retryConfig,
ObjectMapper objectMapper ObjectMapper objectMapper,
ServerConfig serverConfig
) )
{ {
this.emitter = emitter; this.emitter = emitter;
@ -60,6 +63,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
this.warehouse = warehouse; this.warehouse = warehouse;
this.retryConfig = retryConfig; this.retryConfig = retryConfig;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.serverConfig = serverConfig;
} }
@Override @Override
@ -86,10 +90,13 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
return new FluentQueryRunnerBuilder<>(toolChest) return new FluentQueryRunnerBuilder<>(toolChest)
.create( .create(
new RetryQueryRunner<>( new SetAndVerifyContextQueryRunner(
baseClientRunner, serverConfig,
retryConfig, new RetryQueryRunner<>(
objectMapper baseClientRunner,
retryConfig,
objectMapper
)
) )
) )
.applyPreMergeDecoration() .applyPreMergeDecoration()

View File

@ -38,7 +38,6 @@ import io.druid.query.QueryPlus;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse; import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger; import io.druid.server.log.RequestLogger;
import io.druid.server.security.Access; import io.druid.server.security.Access;
import io.druid.server.security.AuthenticationResult; import io.druid.server.security.AuthenticationResult;
@ -58,7 +57,7 @@ import java.util.concurrent.TimeUnit;
* *
* <ol> * <ol>
* <li>Initialization ({@link #initialize(Query)})</li> * <li>Initialization ({@link #initialize(Query)})</li>
* <li>Authorization ({@link #authorize(String, String, HttpServletRequest)}</li> * <li>Authorization ({@link #authorize(HttpServletRequest)}</li>
* <li>Execution ({@link #execute()}</li> * <li>Execution ({@link #execute()}</li>
* <li>Logging ({@link #emitLogsAndMetrics(Throwable, String, long)}</li> * <li>Logging ({@link #emitLogsAndMetrics(Throwable, String, long)}</li>
* </ol> * </ol>
@ -74,7 +73,6 @@ public class QueryLifecycle
private final GenericQueryMetricsFactory queryMetricsFactory; private final GenericQueryMetricsFactory queryMetricsFactory;
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final RequestLogger requestLogger; private final RequestLogger requestLogger;
private final ServerConfig serverConfig;
private final AuthorizerMapper authorizerMapper; private final AuthorizerMapper authorizerMapper;
private final long startMs; private final long startMs;
private final long startNs; private final long startNs;
@ -90,7 +88,6 @@ public class QueryLifecycle
final GenericQueryMetricsFactory queryMetricsFactory, final GenericQueryMetricsFactory queryMetricsFactory,
final ServiceEmitter emitter, final ServiceEmitter emitter,
final RequestLogger requestLogger, final RequestLogger requestLogger,
final ServerConfig serverConfig,
final AuthorizerMapper authorizerMapper, final AuthorizerMapper authorizerMapper,
final long startMs, final long startMs,
final long startNs final long startNs
@ -101,7 +98,6 @@ public class QueryLifecycle
this.queryMetricsFactory = queryMetricsFactory; this.queryMetricsFactory = queryMetricsFactory;
this.emitter = emitter; this.emitter = emitter;
this.requestLogger = requestLogger; this.requestLogger = requestLogger;
this.serverConfig = serverConfig;
this.authorizerMapper = authorizerMapper; this.authorizerMapper = authorizerMapper;
this.startMs = startMs; this.startMs = startMs;
this.startNs = startNs; this.startNs = startNs;
@ -171,12 +167,7 @@ public class QueryLifecycle
queryId = UUID.randomUUID().toString(); queryId = UUID.randomUUID().toString();
} }
this.queryPlus = QueryPlus.wrap( this.queryPlus = QueryPlus.wrap(baseQuery.withId(queryId));
(Query) DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
baseQuery.withId(queryId),
serverConfig
)
);
this.toolChest = warehouse.getToolChest(baseQuery); this.toolChest = warehouse.getToolChest(baseQuery);
} }

View File

@ -25,7 +25,6 @@ import io.druid.guice.LazySingleton;
import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse; import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger; import io.druid.server.log.RequestLogger;
import io.druid.server.security.AuthConfig; import io.druid.server.security.AuthConfig;
import io.druid.server.security.AuthorizerMapper; import io.druid.server.security.AuthorizerMapper;
@ -38,7 +37,6 @@ public class QueryLifecycleFactory
private final GenericQueryMetricsFactory queryMetricsFactory; private final GenericQueryMetricsFactory queryMetricsFactory;
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final RequestLogger requestLogger; private final RequestLogger requestLogger;
private final ServerConfig serverConfig;
private final AuthorizerMapper authorizerMapper; private final AuthorizerMapper authorizerMapper;
@Inject @Inject
@ -48,7 +46,6 @@ public class QueryLifecycleFactory
final GenericQueryMetricsFactory queryMetricsFactory, final GenericQueryMetricsFactory queryMetricsFactory,
final ServiceEmitter emitter, final ServiceEmitter emitter,
final RequestLogger requestLogger, final RequestLogger requestLogger,
final ServerConfig serverConfig,
final AuthConfig authConfig, final AuthConfig authConfig,
final AuthorizerMapper authorizerMapper final AuthorizerMapper authorizerMapper
) )
@ -58,7 +55,6 @@ public class QueryLifecycleFactory
this.queryMetricsFactory = queryMetricsFactory; this.queryMetricsFactory = queryMetricsFactory;
this.emitter = emitter; this.emitter = emitter;
this.requestLogger = requestLogger; this.requestLogger = requestLogger;
this.serverConfig = serverConfig;
this.authorizerMapper = authorizerMapper; this.authorizerMapper = authorizerMapper;
} }
@ -70,7 +66,6 @@ public class QueryLifecycleFactory
queryMetricsFactory, queryMetricsFactory,
emitter, emitter,
requestLogger, requestLogger,
serverConfig,
authorizerMapper, authorizerMapper,
System.currentTimeMillis(), System.currentTimeMillis(),
System.nanoTime() System.nanoTime()

View File

@ -0,0 +1,73 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.server.initialization.ServerConfig;
import java.util.Map;
/**
* Use this QueryRunner to set and verify Query contexts.
*/
public class SetAndVerifyContextQueryRunner implements QueryRunner
{
private final ServerConfig serverConfig;
private final QueryRunner baseRunner;
public SetAndVerifyContextQueryRunner(ServerConfig serverConfig, QueryRunner baseRunner)
{
this.serverConfig = serverConfig;
this.baseRunner = baseRunner;
}
@Override
public Sequence run(QueryPlus queryPlus, Map responseContext)
{
return baseRunner.run(
QueryPlus.wrap((Query) withTimeoutAndMaxScatterGatherBytes(
queryPlus.getQuery(),
serverConfig
)),
responseContext
);
}
public static <T, QueryType extends Query<T>> QueryType withTimeoutAndMaxScatterGatherBytes(
final QueryType query,
ServerConfig serverConfig
)
{
return (QueryType) QueryContexts.verifyMaxQueryTimeout(
QueryContexts.withMaxScatterGatherBytes(
QueryContexts.withDefaultTimeout(
(Query) query,
Math.min(serverConfig.getDefaultQueryTimeout(), serverConfig.getMaxQueryTimeout())
),
serverConfig.getMaxScatterGatherBytes()
),
serverConfig.getMaxQueryTimeout()
);
}
}

View File

@ -54,6 +54,8 @@ import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec; import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.ReferenceCountingSegment; import io.druid.segment.ReferenceCountingSegment;
import io.druid.server.SegmentManager; import io.druid.server.SegmentManager;
import io.druid.server.SetAndVerifyContextQueryRunner;
import io.druid.server.initialization.ServerConfig;
import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionChunk;
@ -79,6 +81,7 @@ public class ServerManager implements QuerySegmentWalker
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final CacheConfig cacheConfig; private final CacheConfig cacheConfig;
private final SegmentManager segmentManager; private final SegmentManager segmentManager;
private final ServerConfig serverConfig;
@Inject @Inject
public ServerManager( public ServerManager(
@ -89,7 +92,8 @@ public class ServerManager implements QuerySegmentWalker
@Smile ObjectMapper objectMapper, @Smile ObjectMapper objectMapper,
Cache cache, Cache cache,
CacheConfig cacheConfig, CacheConfig cacheConfig,
SegmentManager segmentManager SegmentManager segmentManager,
ServerConfig serverConfig
) )
{ {
this.conglomerate = conglomerate; this.conglomerate = conglomerate;
@ -102,6 +106,7 @@ public class ServerManager implements QuerySegmentWalker
this.cacheConfig = cacheConfig; this.cacheConfig = cacheConfig;
this.segmentManager = segmentManager; this.segmentManager = segmentManager;
this.serverConfig = serverConfig;
} }
@Override @Override
@ -275,40 +280,43 @@ public class ServerManager implements QuerySegmentWalker
{ {
SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor); SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor);
String segmentId = adapter.getIdentifier(); String segmentId = adapter.getIdentifier();
return CPUTimeMetricQueryRunner.safeBuild( return new SetAndVerifyContextQueryRunner(
new SpecificSegmentQueryRunner<T>( serverConfig,
new MetricsEmittingQueryRunner<T>( CPUTimeMetricQueryRunner.safeBuild(
emitter, new SpecificSegmentQueryRunner<T>(
toolChest, new MetricsEmittingQueryRunner<T>(
new BySegmentQueryRunner<T>( emitter,
segmentId, toolChest,
adapter.getDataInterval().getStart(), new BySegmentQueryRunner<T>(
new CachingQueryRunner<T>(
segmentId, segmentId,
segmentDescriptor, adapter.getDataInterval().getStart(),
objectMapper, new CachingQueryRunner<T>(
cache, segmentId,
toolChest, segmentDescriptor,
new MetricsEmittingQueryRunner<T>( objectMapper,
emitter, cache,
toolChest, toolChest,
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter, segmentDescriptor), new MetricsEmittingQueryRunner<T>(
QueryMetrics::reportSegmentTime, emitter,
queryMetrics -> queryMetrics.segment(segmentId) toolChest,
), new ReferenceCountingSegmentQueryRunner<T>(factory, adapter, segmentDescriptor),
cachingExec, QueryMetrics::reportSegmentTime,
cacheConfig queryMetrics -> queryMetrics.segment(segmentId)
) ),
), cachingExec,
QueryMetrics::reportSegmentAndCacheTime, cacheConfig
queryMetrics -> queryMetrics.segment(segmentId) )
).withWaitMeasuredFromNow(), ),
segmentSpec QueryMetrics::reportSegmentAndCacheTime,
), queryMetrics -> queryMetrics.segment(segmentId)
toolChest, ).withWaitMeasuredFromNow(),
emitter, segmentSpec
cpuTimeAccumulator, ),
false toolChest,
emitter,
cpuTimeAccumulator,
false
)
); );
} }
} }

View File

@ -53,6 +53,10 @@ public class ServerConfig
@Min(1) @Min(1)
private long maxScatterGatherBytes = Long.MAX_VALUE; private long maxScatterGatherBytes = Long.MAX_VALUE;
@JsonProperty
@Min(1)
private long maxQueryTimeout = Long.MAX_VALUE;
public int getNumThreads() public int getNumThreads()
{ {
return numThreads; return numThreads;
@ -83,6 +87,11 @@ public class ServerConfig
return maxScatterGatherBytes; return maxScatterGatherBytes;
} }
public long getMaxQueryTimeout()
{
return maxQueryTimeout;
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
@ -98,7 +107,8 @@ public class ServerConfig
enableRequestLimit == that.enableRequestLimit && enableRequestLimit == that.enableRequestLimit &&
defaultQueryTimeout == that.defaultQueryTimeout && defaultQueryTimeout == that.defaultQueryTimeout &&
maxScatterGatherBytes == that.maxScatterGatherBytes && maxScatterGatherBytes == that.maxScatterGatherBytes &&
Objects.equals(maxIdleTime, that.maxIdleTime); Objects.equals(maxIdleTime, that.maxIdleTime) &&
maxQueryTimeout == that.maxQueryTimeout;
} }
@Override @Override
@ -110,7 +120,8 @@ public class ServerConfig
enableRequestLimit, enableRequestLimit,
maxIdleTime, maxIdleTime,
defaultQueryTimeout, defaultQueryTimeout,
maxScatterGatherBytes maxScatterGatherBytes,
maxQueryTimeout
); );
} }
} }

View File

@ -42,7 +42,6 @@ import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import io.druid.query.timeboundary.TimeBoundaryResultValue; import io.druid.query.timeboundary.TimeBoundaryResultValue;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.TestRequestLogger; import io.druid.server.log.TestRequestLogger;
import io.druid.server.metrics.NoopServiceEmitter; import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.server.security.Access; import io.druid.server.security.Access;
@ -56,7 +55,6 @@ import io.druid.server.security.ForbiddenException;
import io.druid.server.security.Resource; import io.druid.server.security.Resource;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -84,21 +82,6 @@ public class QueryResourceTest
private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final AuthenticationResult authenticationResult = new AuthenticationResult("druid", "druid", null); private static final AuthenticationResult authenticationResult = new AuthenticationResult("druid", "druid", null);
public static final ServerConfig serverConfig = new ServerConfig()
{
@Override
public int getNumThreads()
{
return 1;
}
@Override
public Period getMaxIdleTime()
{
return Period.seconds(1);
}
};
private final HttpServletRequest testServletRequest = EasyMock.createMock(HttpServletRequest.class); private final HttpServletRequest testServletRequest = EasyMock.createMock(HttpServletRequest.class);
public static final QuerySegmentWalker testSegmentWalker = new QuerySegmentWalker() public static final QuerySegmentWalker testSegmentWalker = new QuerySegmentWalker()
{ {
@ -154,7 +137,6 @@ public class QueryResourceTest
new DefaultGenericQueryMetricsFactory(jsonMapper), new DefaultGenericQueryMetricsFactory(jsonMapper),
new NoopServiceEmitter(), new NoopServiceEmitter(),
testRequestLogger, testRequestLogger,
serverConfig,
new AuthConfig(), new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER AuthTestUtils.TEST_AUTHORIZER_MAPPER
), ),
@ -265,7 +247,6 @@ public class QueryResourceTest
new DefaultGenericQueryMetricsFactory(jsonMapper), new DefaultGenericQueryMetricsFactory(jsonMapper),
new NoopServiceEmitter(), new NoopServiceEmitter(),
testRequestLogger, testRequestLogger,
serverConfig,
new AuthConfig(null, null, null), new AuthConfig(null, null, null),
authMapper authMapper
), ),
@ -373,7 +354,6 @@ public class QueryResourceTest
new DefaultGenericQueryMetricsFactory(jsonMapper), new DefaultGenericQueryMetricsFactory(jsonMapper),
new NoopServiceEmitter(), new NoopServiceEmitter(),
testRequestLogger, testRequestLogger,
serverConfig,
new AuthConfig(null, null, null), new AuthConfig(null, null, null),
authMapper authMapper
), ),
@ -495,7 +475,6 @@ public class QueryResourceTest
new DefaultGenericQueryMetricsFactory(jsonMapper), new DefaultGenericQueryMetricsFactory(jsonMapper),
new NoopServiceEmitter(), new NoopServiceEmitter(),
testRequestLogger, testRequestLogger,
serverConfig,
new AuthConfig(null, null, null), new AuthConfig(null, null, null),
authMapper authMapper
), ),

View File

@ -66,6 +66,7 @@ import io.druid.segment.StorageAdapter;
import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.SegmentManager; import io.druid.server.SegmentManager;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.metrics.NoopServiceEmitter; import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
@ -155,7 +156,8 @@ public class ServerManagerTest
new DefaultObjectMapper(), new DefaultObjectMapper(),
new LocalCacheProvider().get(), new LocalCacheProvider().get(),
new CacheConfig(), new CacheConfig(),
segmentManager segmentManager,
new ServerConfig()
); );
loadQueryable("test", "1", Intervals.of("P1d/2011-04-01")); loadQueryable("test", "1", Intervals.of("P1d/2011-04-01"));

View File

@ -92,7 +92,6 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.TestHelper; import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.server.QueryLifecycleFactory; import io.druid.server.QueryLifecycleFactory;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.NoopRequestLogger; import io.druid.server.log.NoopRequestLogger;
import io.druid.server.security.Access; import io.druid.server.security.Access;
import io.druid.server.security.Action; import io.druid.server.security.Action;
@ -402,7 +401,6 @@ public class CalciteTests
new DefaultGenericQueryMetricsFactory(INJECTOR.getInstance(Key.get(ObjectMapper.class, Json.class))), new DefaultGenericQueryMetricsFactory(INJECTOR.getInstance(Key.get(ObjectMapper.class, Json.class))),
new ServiceEmitter("dummy", "dummy", new NoopEmitter()), new ServiceEmitter("dummy", "dummy", new NoopEmitter()),
new NoopRequestLogger(), new NoopRequestLogger(),
new ServerConfig(),
new AuthConfig(), new AuthConfig(),
TEST_AUTHORIZER_MAPPER TEST_AUTHORIZER_MAPPER
); );