improve query timeout handling and limit max scatter-gather bytes (#4229)

* improve query timeout handling and limit max scatter-gather bytes

* address review comments
This commit is contained in:
Himanshu 2017-05-16 12:47:32 -05:00 committed by GitHub
parent e823085866
commit 136b2fae72
8 changed files with 221 additions and 45 deletions

View File

@ -37,6 +37,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.server.http.numThreads`|Number of threads for HTTP requests.|10| |`druid.server.http.numThreads`|Number of threads for HTTP requests.|10|
|`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m| |`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000| |`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This is an advance configuration that allows to protect in case broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used.|Long.MAX_VALUE|
|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20| |`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20|
|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip| |`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip|
|`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M| |`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M|

View File

@ -10,6 +10,7 @@ The query context is used for various query configuration parameters. The follow
|property |default | description | |property |default | description |
|-----------------|----------------------------------------|----------------------| |-----------------|----------------------------------------|----------------------|
|timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](broker.html) | |timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](broker.html) |
|maxScatterGatherBytes| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [broker configuration](broker.html) for more details.|
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| |priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | |queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache | |useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache |

View File

@ -21,12 +21,14 @@ package io.druid.query;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
public class QueryContexts public class QueryContexts
{ {
public static final String PRIORITY_KEY = "priority"; public static final String PRIORITY_KEY = "priority";
public static final String TIMEOUT_KEY = "timeout"; public static final String TIMEOUT_KEY = "timeout";
public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes";
public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout"; public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout";
public static final String CHUNK_PERIOD_KEY = "chunkPeriod"; public static final String CHUNK_PERIOD_KEY = "chunkPeriod";
@ -98,6 +100,31 @@ public class QueryContexts
return query.getContextValue(CHUNK_PERIOD_KEY, "P0D"); return query.getContextValue(CHUNK_PERIOD_KEY, "P0D");
} }
public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
{
Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY);
if (obj == null) {
return query.withOverriddenContext(ImmutableMap.of(MAX_SCATTER_GATHER_BYTES_KEY, maxScatterGatherBytesLimit));
} else {
long curr = ((Number) obj).longValue();
if (curr > maxScatterGatherBytesLimit) {
throw new IAE(
"configured [%s = %s] is more than enforced limit of [%s].",
MAX_SCATTER_GATHER_BYTES_KEY,
curr,
maxScatterGatherBytesLimit
);
} else {
return query;
}
}
}
public static <T> long getMaxScatterGatherBytes(Query<T> query)
{
return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE);
}
public static <T> boolean hasTimeout(Query<T> query) public static <T> boolean hasTimeout(Query<T> query)
{ {
return getTimeout(query) != NO_TIMEOUT; return getTimeout(query) != NO_TIMEOUT;
@ -115,6 +142,11 @@ public class QueryContexts
return timeout; return timeout;
} }
public static <T> Query<T> withTimeout(Query<T> query, long timeout)
{
return query.withOverriddenContext(ImmutableMap.of(TIMEOUT_KEY, timeout));
}
public static <T> Query<T> withDefaultTimeout(Query<T> query, long defaultTimeout) public static <T> Query<T> withDefaultTimeout(Query<T> query, long defaultTimeout)
{ {
return query.withOverriddenContext(ImmutableMap.of(QueryContexts.DEFAULT_TIMEOUT_KEY, defaultTimeout)); return query.withOverriddenContext(ImmutableMap.of(QueryContexts.DEFAULT_TIMEOUT_KEY, defaultTimeout));

View File

@ -19,6 +19,8 @@
package io.druid.query; package io.druid.query;
import io.druid.common.utils.StringUtils;
/** /**
* Exception indicating that an operation failed because it exceeded some configured resource limit. * Exception indicating that an operation failed because it exceeded some configured resource limit.
* *
@ -27,8 +29,8 @@ package io.druid.query;
*/ */
public class ResourceLimitExceededException extends RuntimeException public class ResourceLimitExceededException extends RuntimeException
{ {
public ResourceLimitExceededException(String message) public ResourceLimitExceededException(String message, Object... arguments)
{ {
super(message); super(StringUtils.safeFormat(message, arguments));
} }
} }

View File

@ -42,6 +42,7 @@ import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.common.utils.StringUtils;
import io.druid.java.util.common.IAE; import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Pair; import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE; import io.druid.java.util.common.RE;
@ -60,6 +61,7 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse; import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.QueryWatcher; import io.druid.query.QueryWatcher;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.MetricManipulatorFns; import io.druid.query.aggregation.MetricManipulatorFns;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
@ -68,6 +70,7 @@ import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.Duration;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import java.io.Closeable; import java.io.Closeable;
@ -84,14 +87,19 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/** /**
*/ */
public class DirectDruidClient<T> implements QueryRunner<T> public class DirectDruidClient<T> implements QueryRunner<T>
{ {
public static final String QUERY_FAIL_TIME = "queryFailTime";
public static final String QUERY_TOTAL_BYTES_GATHERED = "queryTotalBytesGathered";
private static final Logger log = new Logger(DirectDruidClient.class); private static final Logger log = new Logger(DirectDruidClient.class);
private static final Map<Class<? extends Query>, Pair<JavaType, JavaType>> typesMap = Maps.newConcurrentMap(); private static final Map<Class<? extends Query>, Pair<JavaType, JavaType>> typesMap = Maps.newConcurrentMap();
@ -168,16 +176,24 @@ public class DirectDruidClient<T> implements QueryRunner<T>
final QueryMetrics<? super Query<T>> queryMetrics = toolChest.makeMetrics(query); final QueryMetrics<? super Query<T>> queryMetrics = toolChest.makeMetrics(query);
queryMetrics.server(host); queryMetrics.server(host);
long timeoutAt = ((Long) context.get(QUERY_FAIL_TIME)).longValue();
long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query);
AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED);
final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>() final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
{ {
private long responseStartTimeNs; private long responseStartTimeNs;
private final AtomicLong byteCount = new AtomicLong(0); private final AtomicLong byteCount = new AtomicLong(0);
private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>(); private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false); private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<String> fail = new AtomicReference<>();
@Override @Override
public ClientResponse<InputStream> handleResponse(HttpResponse response) public ClientResponse<InputStream> handleResponse(HttpResponse response)
{ {
checkQueryTimeout();
checkTotalBytesLimit(response.getContent().readableBytes());
log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId()); log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId());
responseStartTimeNs = System.nanoTime(); responseStartTimeNs = System.nanoTime();
queryMetrics.reportNodeTimeToFirstByte(responseStartTimeNs - requestStartTimeNs).emit(emitter); queryMetrics.reportNodeTimeToFirstByte(responseStartTimeNs - requestStartTimeNs).emit(emitter);
@ -222,6 +238,11 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override @Override
public boolean hasMoreElements() public boolean hasMoreElements()
{ {
if (fail.get() != null) {
throw new RE(fail.get());
}
checkQueryTimeout();
// Done is always true until the last stream has be put in the queue. // Done is always true until the last stream has be put in the queue.
// Then the stream should be spouting good InputStreams. // Then the stream should be spouting good InputStreams.
synchronized (done) { synchronized (done) {
@ -232,8 +253,17 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override @Override
public InputStream nextElement() public InputStream nextElement()
{ {
if (fail.get() != null) {
throw new RE(fail.get());
}
try { try {
return queue.take(); InputStream is = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS);
if (is != null) {
return is;
} else {
throw new RE("Query[%s] url[%s] timed out.", query.getId(), url);
}
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -250,8 +280,13 @@ public class DirectDruidClient<T> implements QueryRunner<T>
ClientResponse<InputStream> clientResponse, HttpChunk chunk ClientResponse<InputStream> clientResponse, HttpChunk chunk
) )
{ {
checkQueryTimeout();
final ChannelBuffer channelBuffer = chunk.getContent(); final ChannelBuffer channelBuffer = chunk.getContent();
final int bytes = channelBuffer.readableBytes(); final int bytes = channelBuffer.readableBytes();
checkTotalBytesLimit(bytes);
if (bytes > 0) { if (bytes > 0) {
try { try {
queue.put(new ChannelBufferInputStream(channelBuffer)); queue.put(new ChannelBufferInputStream(channelBuffer));
@ -308,34 +343,78 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override @Override
public void exceptionCaught(final ClientResponse<InputStream> clientResponse, final Throwable e) public void exceptionCaught(final ClientResponse<InputStream> clientResponse, final Throwable e)
{ {
// Don't wait for lock in case the lock had something to do with the error String msg = StringUtils.safeFormat(
synchronized (done) { "Query[%s] url[%s] failed with exception msg [%s]",
done.set(true); query.getId(),
// Make a best effort to put a zero length buffer into the queue in case something is waiting on the take() url,
// If nothing is waiting on take(), this will be closed out anyways. e.getMessage()
queue.offer( );
new InputStream() setupResponseReadFailure(msg, e);
{ }
@Override
public int read() throws IOException private void setupResponseReadFailure(String msg, Throwable th)
{ {
throw new IOException(e); fail.set(msg);
} queue.clear();
} queue.offer(new InputStream()
{
@Override
public int read() throws IOException
{
if (th != null) {
throw new IOException(msg, th);
} else {
throw new IOException(msg);
}
}
});
}
// Returns remaining timeout or throws exception if timeout already elapsed.
private long checkQueryTimeout()
{
long timeLeft = timeoutAt - System.currentTimeMillis();
if (timeLeft >= 0) {
String msg = StringUtils.safeFormat("Query[%s] url[%s] timed out.", query.getId(), url);
setupResponseReadFailure(msg, null);
throw new RE(msg);
} else {
return timeLeft;
}
}
private void checkTotalBytesLimit(long bytes)
{
if (maxScatterGatherBytes < Long.MAX_VALUE && totalBytesGathered.addAndGet(bytes) > maxScatterGatherBytes) {
String msg = StringUtils.safeFormat(
"Query[%s] url[%s] max scatter-gather bytes limit reached.",
query.getId(),
url
); );
setupResponseReadFailure(msg, null);
throw new RE(msg);
} }
} }
}; };
long timeLeft = timeoutAt - System.currentTimeMillis();
if (timeLeft <= 0) {
throw new RE("Query[%s] url[%s] timed out.", query.getId(), url);
}
future = httpClient.go( future = httpClient.go(
new Request( new Request(
HttpMethod.POST, HttpMethod.POST,
new URL(url) new URL(url)
).setContent(objectMapper.writeValueAsBytes(query)) ).setContent(objectMapper.writeValueAsBytes(QueryContexts.withTimeout(query, timeLeft)))
.setHeader( .setHeader(
HttpHeaders.Names.CONTENT_TYPE, HttpHeaders.Names.CONTENT_TYPE,
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
), ),
responseHandler responseHandler,
Duration.millis(timeLeft)
); );
queryWatcher.registerQuery(query, future); queryWatcher.registerQuery(query, future);
@ -368,8 +447,10 @@ public class DirectDruidClient<T> implements QueryRunner<T>
? SmileMediaTypes.APPLICATION_JACKSON_SMILE ? SmileMediaTypes.APPLICATION_JACKSON_SMILE
: MediaType.APPLICATION_JSON : MediaType.APPLICATION_JSON
), ),
new StatusResponseHandler(Charsets.UTF_8) new StatusResponseHandler(Charsets.UTF_8),
).get(); Duration.standardSeconds(1)
).get(1, TimeUnit.SECONDS);
if (res.getStatus().getCode() >= 500) { if (res.getStatus().getCode() >= 500) {
throw new RE( throw new RE(
"Error cancelling query[%s]: queriable node returned status[%d] [%s].", "Error cancelling query[%s]: queriable node returned status[%d] [%s].",
@ -378,7 +459,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
); );
} }
} }
catch (IOException | ExecutionException | InterruptedException e) { catch (IOException | ExecutionException | InterruptedException | TimeoutException e) {
Throwables.propagate(e); Throwables.propagate(e);
} }
} }
@ -396,7 +477,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override @Override
public JsonParserIterator<T> make() public JsonParserIterator<T> make()
{ {
return new JsonParserIterator<T>(typeRef, future, url); return new JsonParserIterator<T>(typeRef, future, url, query);
} }
@Override @Override
@ -428,13 +509,15 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private ObjectCodec objectCodec; private ObjectCodec objectCodec;
private final JavaType typeRef; private final JavaType typeRef;
private final Future<InputStream> future; private final Future<InputStream> future;
private final Query<T> query;
private final String url; private final String url;
public JsonParserIterator(JavaType typeRef, Future<InputStream> future, String url) public JsonParserIterator(JavaType typeRef, Future<InputStream> future, String url, Query<T> query)
{ {
this.typeRef = typeRef; this.typeRef = typeRef;
this.future = future; this.future = future;
this.url = url; this.url = url;
this.query = query;
jp = null; jp = null;
} }
@ -458,6 +541,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public T next() public T next()
{ {
init(); init();
try { try {
final T retVal = objectCodec.readValue(jp, typeRef); final T retVal = objectCodec.readValue(jp, typeRef);
jp.nextToken(); jp.nextToken();
@ -478,7 +562,19 @@ public class DirectDruidClient<T> implements QueryRunner<T>
{ {
if (jp == null) { if (jp == null) {
try { try {
jp = objectMapper.getFactory().createParser(future.get()); InputStream is = future.get();
if (is == null) {
throw new QueryInterruptedException(
new ResourceLimitExceededException(
"query[%s] url[%s] timed out or max bytes limit reached.",
query.getId(),
url
),
host
);
} else {
jp = objectMapper.getFactory().createParser(is);
}
final JsonToken nextToken = jp.nextToken(); final JsonToken nextToken = jp.nextToken();
if (nextToken == JsonToken.START_OBJECT) { if (nextToken == JsonToken.START_OBJECT) {
QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class); QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class);
@ -491,7 +587,13 @@ public class DirectDruidClient<T> implements QueryRunner<T>
} }
} }
catch (IOException | InterruptedException | ExecutionException e) { catch (IOException | InterruptedException | ExecutionException e) {
throw new RE(e, "Failure getting results from[%s] because of [%s]", url, e.getMessage()); throw new RE(
e,
"Failure getting results for query[%s] url[%s] because of [%s]",
query.getId(),
url,
e.getMessage()
);
} }
catch (CancellationException e) { catch (CancellationException e) {
throw new QueryInterruptedException(e, host); throw new QueryInterruptedException(e, host);

View File

@ -30,6 +30,7 @@ import com.google.common.io.CountingOutputStream;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DirectDruidClient;
import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
@ -186,6 +187,8 @@ public class QueryResource implements QueryCountStatsProvider
final String currThreadName = Thread.currentThread().getName(); final String currThreadName = Thread.currentThread().getName();
try { try {
final Map<String, Object> responseContext = new MapMaker().makeMap();
query = context.getObjectMapper().readValue(in, Query.class); query = context.getObjectMapper().readValue(in, Query.class);
queryId = query.getId(); queryId = query.getId();
if (queryId == null) { if (queryId == null) {
@ -193,6 +196,13 @@ public class QueryResource implements QueryCountStatsProvider
query = query.withId(queryId); query = query.withId(queryId);
} }
query = QueryContexts.withDefaultTimeout(query, config.getDefaultQueryTimeout()); query = QueryContexts.withDefaultTimeout(query, config.getDefaultQueryTimeout());
query = QueryContexts.withMaxScatterGatherBytes(query, config.getMaxScatterGatherBytes());
responseContext.put(
DirectDruidClient.QUERY_FAIL_TIME,
System.currentTimeMillis() + QueryContexts.getTimeout(query)
);
responseContext.put(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
toolChest = warehouse.getToolChest(query); toolChest = warehouse.getToolChest(query);
@ -227,7 +237,6 @@ public class QueryResource implements QueryCountStatsProvider
); );
} }
final Map<String, Object> responseContext = new MapMaker().makeMap();
final Sequence res = QueryPlus.wrap(query).run(texasRanger, responseContext); final Sequence res = QueryPlus.wrap(query).run(texasRanger, responseContext);
if (prevEtag != null && prevEtag.equals(responseContext.get(HDR_ETAG))) { if (prevEtag != null && prevEtag.equals(responseContext.get(HDR_ETAG))) {
@ -330,6 +339,9 @@ public class QueryResource implements QueryCountStatsProvider
responseContext.remove(HDR_ETAG); responseContext.remove(HDR_ETAG);
} }
responseContext.remove(DirectDruidClient.QUERY_FAIL_TIME);
responseContext.remove(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED);
//Limit the response-context header, see https://github.com/druid-io/druid/issues/2331 //Limit the response-context header, see https://github.com/druid-io/druid/issues/2331
//Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString() //Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString()
//and encodes the string using ASCII, so 1 char is = 1 byte //and encodes the string using ASCII, so 1 char is = 1 byte

View File

@ -41,6 +41,10 @@ public class ServerConfig
@Min(0) @Min(0)
private long defaultQueryTimeout = 300_000; // 5 minutes private long defaultQueryTimeout = 300_000; // 5 minutes
@JsonProperty
@Min(1)
private long maxScatterGatherBytes = Long.MAX_VALUE;
public int getNumThreads() public int getNumThreads()
{ {
return numThreads; return numThreads;
@ -56,6 +60,11 @@ public class ServerConfig
return defaultQueryTimeout; return defaultQueryTimeout;
} }
public long getMaxScatterGatherBytes()
{
return maxScatterGatherBytes;
}
@Override @Override
public String toString() public String toString()
{ {
@ -63,6 +72,7 @@ public class ServerConfig
"numThreads=" + numThreads + "numThreads=" + numThreads +
", maxIdleTime=" + maxIdleTime + ", maxIdleTime=" + maxIdleTime +
", defaultQueryTimeout=" + defaultQueryTimeout + ", defaultQueryTimeout=" + defaultQueryTimeout +
", maxScatterGatherBytes=" + maxScatterGatherBytes +
'}'; '}';
} }
} }

View File

@ -51,6 +51,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.timeout.ReadTimeoutException; import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -60,9 +61,20 @@ import java.io.InputStream;
import java.net.URL; import java.net.URL;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class DirectDruidClientTest public class DirectDruidClientTest
{ {
private final Map<String, Object> defaultContext;
public DirectDruidClientTest()
{
defaultContext = new HashMap<>();
defaultContext.put(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE);
defaultContext.put(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
}
@Test @Test
public void testRun() throws Exception public void testRun() throws Exception
{ {
@ -74,7 +86,8 @@ public class DirectDruidClientTest
EasyMock.expect( EasyMock.expect(
httpClient.go( httpClient.go(
EasyMock.capture(capturedRequest), EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject() EasyMock.<HttpResponseHandler>anyObject(),
EasyMock.anyObject(Duration.class)
) )
) )
.andReturn(futureResult) .andReturn(futureResult)
@ -84,7 +97,8 @@ public class DirectDruidClientTest
EasyMock.expect( EasyMock.expect(
httpClient.go( httpClient.go(
EasyMock.capture(capturedRequest), EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject() EasyMock.<HttpResponseHandler>anyObject(),
EasyMock.anyObject(Duration.class)
) )
) )
.andReturn(futureException) .andReturn(futureException)
@ -93,7 +107,8 @@ public class DirectDruidClientTest
EasyMock.expect( EasyMock.expect(
httpClient.go( httpClient.go(
EasyMock.capture(capturedRequest), EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject() EasyMock.<HttpResponseHandler>anyObject(),
EasyMock.anyObject(Duration.class)
) )
) )
.andReturn(SettableFuture.create()) .andReturn(SettableFuture.create())
@ -145,23 +160,23 @@ public class DirectDruidClientTest
serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment());
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
HashMap<String, List> context = Maps.newHashMap();
Sequence s1 = client1.run(query, context); Sequence s1 = client1.run(query, defaultContext);
Assert.assertTrue(capturedRequest.hasCaptured()); Assert.assertTrue(capturedRequest.hasCaptured());
Assert.assertEquals(url, capturedRequest.getValue().getUrl()); Assert.assertEquals(url, capturedRequest.getValue().getUrl());
Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod()); Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod());
Assert.assertEquals(1, client1.getNumOpenConnections()); Assert.assertEquals(1, client1.getNumOpenConnections());
// simulate read timeout // simulate read timeout
Sequence s2 = client1.run(query, context); Sequence s2 = client1.run(query, defaultContext);
Assert.assertEquals(2, client1.getNumOpenConnections()); Assert.assertEquals(2, client1.getNumOpenConnections());
futureException.setException(new ReadTimeoutException()); futureException.setException(new ReadTimeoutException());
Assert.assertEquals(1, client1.getNumOpenConnections()); Assert.assertEquals(1, client1.getNumOpenConnections());
// subsequent connections should work // subsequent connections should work
Sequence s3 = client1.run(query, context); Sequence s3 = client1.run(query, defaultContext);
Sequence s4 = client1.run(query, context); Sequence s4 = client1.run(query, defaultContext);
Sequence s5 = client1.run(query, context); Sequence s5 = client1.run(query, defaultContext);
Assert.assertTrue(client1.getNumOpenConnections() == 4); Assert.assertTrue(client1.getNumOpenConnections() == 4);
@ -172,8 +187,8 @@ public class DirectDruidClientTest
Assert.assertEquals(new DateTime("2014-01-01T01:02:03Z"), results.get(0).getTimestamp()); Assert.assertEquals(new DateTime("2014-01-01T01:02:03Z"), results.get(0).getTimestamp());
Assert.assertEquals(3, client1.getNumOpenConnections()); Assert.assertEquals(3, client1.getNumOpenConnections());
client2.run(query, context); client2.run(query, defaultContext);
client2.run(query, context); client2.run(query, defaultContext);
Assert.assertTrue(client2.getNumOpenConnections() == 2); Assert.assertTrue(client2.getNumOpenConnections() == 2);
@ -194,7 +209,8 @@ public class DirectDruidClientTest
EasyMock.expect( EasyMock.expect(
httpClient.go( httpClient.go(
EasyMock.capture(capturedRequest), EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject() EasyMock.<HttpResponseHandler>anyObject(),
EasyMock.anyObject(Duration.class)
) )
) )
.andReturn(cancelledFuture) .andReturn(cancelledFuture)
@ -203,7 +219,8 @@ public class DirectDruidClientTest
EasyMock.expect( EasyMock.expect(
httpClient.go( httpClient.go(
EasyMock.capture(capturedRequest), EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject() EasyMock.<HttpResponseHandler>anyObject(),
EasyMock.anyObject(Duration.class)
) )
) )
.andReturn(cancellationFuture) .andReturn(cancellationFuture)
@ -242,9 +259,8 @@ public class DirectDruidClientTest
serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment()); serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment());
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
HashMap<String, List> context = Maps.newHashMap();
cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled")));
Sequence results = client1.run(query, context); Sequence results = client1.run(query, defaultContext);
Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod()); Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod());
Assert.assertEquals(0, client1.getNumOpenConnections()); Assert.assertEquals(0, client1.getNumOpenConnections());
@ -271,7 +287,8 @@ public class DirectDruidClientTest
EasyMock.expect( EasyMock.expect(
httpClient.go( httpClient.go(
EasyMock.capture(capturedRequest), EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject() EasyMock.<HttpResponseHandler>anyObject(),
EasyMock.anyObject(Duration.class)
) )
) )
.andReturn(interruptionFuture) .andReturn(interruptionFuture)
@ -312,9 +329,8 @@ public class DirectDruidClientTest
serverSelector.addServerAndUpdateSegment(queryableDruidServer, dataSegment); serverSelector.addServerAndUpdateSegment(queryableDruidServer, dataSegment);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
HashMap<String, List> context = Maps.newHashMap();
interruptionFuture.set(new ByteArrayInputStream("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}".getBytes())); interruptionFuture.set(new ByteArrayInputStream("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}".getBytes()));
Sequence results = client1.run(query, context); Sequence results = client1.run(query, defaultContext);
QueryInterruptedException actualException = null; QueryInterruptedException actualException = null;
try { try {