Improved exception handling in case of query timeouts (#10464)

* Separate timeout exceptions

* Add more tests

Co-authored-by: Atul Mohan <atulmohan@yahoo-inc.com>
This commit is contained in:
Atul Mohan 2020-11-03 09:00:33 -06:00 committed by GitHub
parent d8e5a159e8
commit 6ccddedb7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 407 additions and 78 deletions

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
@ -187,7 +188,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
{
final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime();
if (hasTimeout && thisTimeoutNanos < 0) {
throw new RE(new TimeoutException("Sequence iterator timed out"));
throw new QueryTimeoutException("Sequence iterator timed out");
}
if (currentBatch != null && !currentBatch.isTerminalResult() && !currentBatch.isDrained()) {
@ -202,7 +203,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
}
}
if (currentBatch == null) {
throw new RE(new TimeoutException("Sequence iterator timed out waiting for data"));
throw new QueryTimeoutException("Sequence iterator timed out waiting for data");
}
if (cancellationGizmo.isCancelled()) {

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import java.net.InetAddress;
/**
* This exception is thrown when a query does not finish before the configured query timeout.
* {@link java.util.concurrent.TimeoutException} exceptions encountered during the lifecycle of a query
* are rethrown as this exception.
* <p>
* As a {@link QueryException}, it is expected to be serialized to a json response, but will be mapped to
* {@link #STATUS_CODE} instead of the default HTTP 500 status.
*/
public class QueryTimeoutException extends QueryException
{
private static final String ERROR_CLASS = QueryTimeoutException.class.getName();
public static final String ERROR_CODE = "Query timeout";
public static final String ERROR_MESSAGE = "Query Timed Out!";
public static final int STATUS_CODE = 504;
@JsonCreator
public QueryTimeoutException(
@JsonProperty("error") @Nullable String errorCode,
@JsonProperty("errorMessage") String errorMessage,
@JsonProperty("errorClass") @Nullable String errorClass,
@JsonProperty("host") @Nullable String host
)
{
super(errorCode, errorMessage, errorClass, host);
}
public QueryTimeoutException()
{
super(ERROR_CODE, ERROR_MESSAGE, ERROR_CLASS, resolveHostname());
}
public QueryTimeoutException(String errorMessage)
{
super(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
}
public QueryTimeoutException(String errorMessage, String host)
{
super(ERROR_CODE, errorMessage, ERROR_CLASS, host);
}
private static String resolveHostname()
{
String host;
try {
host = InetAddress.getLocalHost().getCanonicalHostName();
}
catch (Exception e) {
host = null;
}
return host;
}
}

View File

@ -23,7 +23,7 @@ import com.google.common.collect.Ordering;
import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.hamcrest.Matchers;
import org.apache.druid.query.QueryTimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -39,7 +39,6 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
@ -511,8 +510,7 @@ public class ParallelMergeCombiningSequenceTest
input.add(nonBlockingSequence(someSize));
input.add(nonBlockingSequence(someSize));
input.add(blockingSequence(someSize, 400, 500, 1, 500, true));
expectedException.expect(RuntimeException.class);
expectedException.expectCause(Matchers.instanceOf(TimeoutException.class));
expectedException.expect(QueryTimeoutException.class);
expectedException.expectMessage("Sequence iterator timed out waiting for data");
assertException(
@ -534,8 +532,7 @@ public class ParallelMergeCombiningSequenceTest
input.add(nonBlockingSequence(someSize));
input.add(nonBlockingSequence(someSize));
expectedException.expect(RuntimeException.class);
expectedException.expectCause(Matchers.instanceOf(TimeoutException.class));
expectedException.expect(QueryTimeoutException.class);
expectedException.expectMessage("Sequence iterator timed out");
assertException(input, 8, 64, 1000, 500);
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class QueryTimeoutExceptionTest
{
@Test
public void testSerde() throws IOException
{
final ObjectMapper mapper = new ObjectMapper();
QueryTimeoutException timeoutException = mapper.readValue(
mapper.writeValueAsBytes(new QueryTimeoutException()),
QueryTimeoutException.class
);
QueryTimeoutException timeoutExceptionWithMsg = mapper.readValue(mapper.writeValueAsBytes(new QueryTimeoutException(
"Another query timeout")), QueryTimeoutException.class);
Assert.assertEquals(
"Query timeout",
timeoutException.getErrorCode()
);
Assert.assertEquals(
"Query Timed Out!",
timeoutException.getMessage()
);
Assert.assertEquals(
"Another query timeout",
timeoutExceptionWithMsg.getMessage()
);
Assert.assertEquals(
"org.apache.druid.query.QueryTimeoutException",
timeoutExceptionWithMsg.getErrorClass()
);
}
@Test
public void testExceptionHost()
{
Assert.assertEquals(
"timeouthost",
new QueryTimeoutException("Timed out", "timeouthost").getHost()
);
}
}

View File

@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.MergeIterable;
import org.apache.druid.java.util.common.guava.Sequence;
@ -132,6 +133,9 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
catch (QueryInterruptedException e) {
throw new RuntimeException(e);
}
catch (QueryTimeoutException e) {
throw e;
}
catch (Exception e) {
log.noStackTrace().error(e, "Exception with one of the sequences!");
Throwables.propagateIfPossible(e);
@ -167,7 +171,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
catch (TimeoutException e) {
log.warn("Query timeout, cancelling pending results for query id [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);

View File

@ -35,6 +35,7 @@ import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@ -201,7 +202,7 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
closeOnFailure.close();
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);

View File

@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
/**
* Exception representing a failed query. The name "QueryInterruptedException" is a misnomer; this is actually
@ -33,7 +32,7 @@ import java.util.concurrent.TimeoutException;
*
* Fields:
* - "errorCode" is a well-defined errorCode code taken from a specific list (see the static constants). "Unknown exception"
* represents all wrapped exceptions other than interrupt, timeout, cancellation, resource limit exceeded, unauthorized
* represents all wrapped exceptions other than interrupt, cancellation, resource limit exceeded, unauthorized
* request, and unsupported operation.
* - "errorMessage" is the toString of the wrapped exception
* - "errorClass" is the class of the wrapped exception
@ -45,7 +44,6 @@ import java.util.concurrent.TimeoutException;
public class QueryInterruptedException extends QueryException
{
public static final String QUERY_INTERRUPTED = "Query interrupted";
public static final String QUERY_TIMEOUT = "Query timeout";
public static final String QUERY_CANCELLED = "Query cancelled";
public static final String RESOURCE_LIMIT_EXCEEDED = "Resource limit exceeded";
public static final String UNAUTHORIZED = "Unauthorized request.";
@ -100,8 +98,6 @@ public class QueryInterruptedException extends QueryException
return QUERY_INTERRUPTED;
} else if (e instanceof CancellationException) {
return QUERY_CANCELLED;
} else if (e instanceof TimeoutException) {
return QUERY_TIMEOUT;
} else if (e instanceof ResourceLimitExceededException) {
return RESOURCE_LIMIT_EXCEEDED;
} else if (e instanceof UnsupportedOperationException) {

View File

@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.AbstractPrioritizedCallable;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
@ -361,10 +362,14 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
final long timeout = queryTimeoutAt - System.currentTimeMillis();
return hasQueryTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get();
}
catch (InterruptedException | TimeoutException | CancellationException e) {
catch (InterruptedException | CancellationException e) {
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
catch (TimeoutException e) {
GuavaUtils.cancelAll(true, future, futures);
throw new QueryTimeoutException();
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
throw new RuntimeException(e.getCause());

View File

@ -49,6 +49,7 @@ import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.context.ResponseContext;
@ -245,7 +246,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
return input.run(queryPlusForRunners, responseContext)
.accumulate(AggregateResult.ok(), accumulator);
}
catch (QueryInterruptedException e) {
catch (QueryInterruptedException | QueryTimeoutException e) {
throw e;
}
catch (Exception e) {
@ -321,16 +322,19 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
if (hasTimeout) {
final long timeout = timeoutAt - System.currentTimeMillis();
if (timeout <= 0) {
throw new TimeoutException();
throw new QueryTimeoutException();
}
if ((mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers, timeout)).isEmpty()) {
throw new TimeoutException("Cannot acquire enough merge buffers");
throw new QueryTimeoutException("Cannot acquire enough merge buffers");
}
} else {
mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers);
}
return mergeBufferHolder;
}
catch (QueryTimeoutException e) {
throw e;
}
catch (Exception e) {
throw new QueryInterruptedException(e);
}
@ -350,7 +354,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
}
if (hasTimeout && timeout <= 0) {
throw new TimeoutException();
throw new QueryTimeoutException();
}
final List<AggregateResult> results = hasTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get();
@ -374,7 +378,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
throw new QueryTimeoutException();
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);

View File

@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.segment.ColumnSelectorFactory;
@ -33,7 +34,6 @@ import org.apache.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* A streaming grouper which can aggregate sorted inputs. This grouper can aggregate while its iterator is being
@ -302,7 +302,7 @@ public class StreamingMergeSortedGrouper<KeyType> implements Grouper<KeyType>
// The below condition is checked in a while loop instead of using a lock to avoid frequent thread park.
while ((nextReadIndex == -1 || nextReadIndex == 0) && !Thread.currentThread().isInterrupted()) {
if (timeoutNs <= 0L) {
throw new RuntimeException(new TimeoutException());
throw new QueryTimeoutException();
}
// Thread.yield() should not be called from the very beginning
if (spinTimeoutNs <= 0L) {
@ -321,7 +321,7 @@ public class StreamingMergeSortedGrouper<KeyType> implements Grouper<KeyType>
// The below condition is checked in a while loop instead of using a lock to avoid frequent thread park.
while ((nextWriteIndex == nextReadIndex) && !Thread.currentThread().isInterrupted()) {
if (timeoutNs <= 0L) {
throw new RuntimeException(new TimeoutException());
throw new QueryTimeoutException();
}
// Thread.yield() should not be called from the very beginning
if (spinTimeoutNs <= 0L) {
@ -470,7 +470,7 @@ public class StreamingMergeSortedGrouper<KeyType> implements Grouper<KeyType>
while ((curWriteIndex == -1 || target == curWriteIndex) &&
!finished && !Thread.currentThread().isInterrupted()) {
if (timeoutNs <= 0L) {
throw new RuntimeException(new TimeoutException());
throw new QueryTimeoutException();
}
// Thread.yield() should not be called from the very beginning
if (spinTimeoutNs <= 0L) {

View File

@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@ -37,6 +38,7 @@ import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -237,7 +239,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException(e);
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
}
catch (ExecutionException e) {
throw new RuntimeException(e);

View File

@ -25,13 +25,14 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
@ -52,7 +53,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeoutException;
public class ScanQueryEngine
{
@ -174,7 +174,7 @@ public class ScanQueryEngine
throw new NoSuchElementException();
}
if (hasTimeout && System.currentTimeMillis() >= timeoutAt) {
throw new QueryInterruptedException(new TimeoutException());
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
}
final long lastOffset = offset;
final Object events;

View File

@ -269,14 +269,14 @@ public class ChainedExecutionQueryRunnerTest
ListenableFuture future = capturedFuture.getValue();
// wait for query to time out
QueryInterruptedException cause = null;
QueryTimeoutException cause = null;
try {
resultFuture.get();
}
catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof QueryInterruptedException);
Assert.assertEquals("Query timeout", ((QueryInterruptedException) e.getCause()).getErrorCode());
cause = (QueryInterruptedException) e.getCause();
Assert.assertTrue(e.getCause() instanceof QueryTimeoutException);
Assert.assertEquals("Query timeout", ((QueryTimeoutException) e.getCause()).getErrorCode());
cause = (QueryTimeoutException) e.getCause();
}
queriesInterrupted.await();
Assert.assertNotNull(cause);

View File

@ -27,7 +27,6 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
public class QueryInterruptedExceptionTest
{
@ -42,7 +41,6 @@ public class QueryInterruptedExceptionTest
);
Assert.assertEquals("Query cancelled", new QueryInterruptedException(new CancellationException()).getErrorCode());
Assert.assertEquals("Query interrupted", new QueryInterruptedException(new InterruptedException()).getErrorCode());
Assert.assertEquals("Query timeout", new QueryInterruptedException(new TimeoutException()).getErrorCode());
Assert.assertEquals("Unsupported operation", new QueryInterruptedException(new UOE("Unsupported")).getErrorCode());
Assert.assertEquals("Unknown exception", new QueryInterruptedException(null).getErrorCode());
Assert.assertEquals("Unknown exception", new QueryInterruptedException(new ISE("Something bad!")).getErrorCode());
@ -71,10 +69,6 @@ public class QueryInterruptedExceptionTest
null,
new QueryInterruptedException(new InterruptedException()).getMessage()
);
Assert.assertEquals(
null,
new QueryInterruptedException(new TimeoutException()).getMessage()
);
Assert.assertEquals(
null,
new QueryInterruptedException(null).getMessage()
@ -108,10 +102,6 @@ public class QueryInterruptedExceptionTest
"java.lang.InterruptedException",
new QueryInterruptedException(new InterruptedException()).getErrorClass()
);
Assert.assertEquals(
"java.util.concurrent.TimeoutException",
new QueryInterruptedException(new TimeoutException()).getErrorClass()
);
Assert.assertEquals(
"org.apache.druid.query.ResourceLimitExceededException",
new QueryInterruptedException(new ResourceLimitExceededException("too many!")).getErrorClass()
@ -162,10 +152,6 @@ public class QueryInterruptedExceptionTest
"java.lang.InterruptedException",
roundTrip(new QueryInterruptedException(new InterruptedException())).getErrorClass()
);
Assert.assertEquals(
"java.util.concurrent.TimeoutException",
roundTrip(new QueryInterruptedException(new TimeoutException())).getErrorClass()
);
Assert.assertEquals(
null,
roundTrip(new QueryInterruptedException(null)).getErrorClass()

View File

@ -33,16 +33,15 @@ import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.InsufficientResourcesException;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
@ -56,7 +55,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
@RunWith(Parameterized.class)
public class GroupByQueryRunnerFailureTest
@ -182,8 +180,7 @@ public class GroupByQueryRunnerFailureTest
@Test(timeout = 60_000L)
public void testNotEnoughMergeBuffersOnQueryable()
{
expectedException.expect(QueryInterruptedException.class);
expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
expectedException.expect(QueryTimeoutException.class);
expectedException.expectMessage("Cannot acquire enough merge buffers");
final GroupByQuery query = GroupByQuery
@ -283,8 +280,7 @@ public class GroupByQueryRunnerFailureTest
@Test(timeout = 60_000L)
public void testTimeoutExceptionOnQueryable()
{
expectedException.expect(QueryInterruptedException.class);
expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
expectedException.expect(QueryTimeoutException.class);
final GroupByQuery query = GroupByQuery
.builder()

View File

@ -27,6 +27,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.dimension.DimensionSpec;
@ -170,6 +171,63 @@ public class ConcurrentGrouperTest
grouper.close();
}
@Test
public void testGrouperTimeout() throws Exception
{
final ConcurrentGrouper<Long> grouper = new ConcurrentGrouper<>(
bufferSupplier,
TEST_RESOURCE_HOLDER,
KEY_SERDE_FACTORY,
KEY_SERDE_FACTORY,
NULL_FACTORY,
new AggregatorFactory[] {new CountAggregatorFactory("cnt")},
1024,
0.7f,
1,
new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024),
new DefaultObjectMapper(),
8,
null,
false,
MoreExecutors.listeningDecorator(SERVICE),
0,
true,
1,
4,
8
);
grouper.init();
final int numRows = 1000;
Future<?>[] futures = new Future[8];
for (int i = 0; i < 8; i++) {
futures[i] = SERVICE.submit(new Runnable()
{
@Override
public void run()
{
for (long i = 0; i < numRows; i++) {
grouper.aggregate(i);
}
}
});
}
for (Future eachFuture : futures) {
eachFuture.get();
}
try {
grouper.iterator(true);
}
catch (RuntimeException e) {
Assert.assertTrue(e instanceof QueryTimeoutException);
Assert.assertEquals("Query timeout", ((QueryTimeoutException) e).getErrorCode());
}
grouper.close();
}
static class TestResourceHolder extends ReferenceCountingResourceHolder<ByteBuffer>
{
private boolean taken;

View File

@ -28,11 +28,12 @@ import com.google.common.primitives.Ints;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.groupby.epinephelinae.Grouper.Entry;
import org.hamcrest.CoreMatchers;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -45,9 +46,8 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
public class StreamingMergeSortedGrouperTest
public class StreamingMergeSortedGrouperTest extends InitializedNullHandlingTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@ -161,8 +161,7 @@ public class StreamingMergeSortedGrouperTest
@Test
public void testTimeout()
{
expectedException.expect(RuntimeException.class);
expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
expectedException.expect(QueryTimeoutException.class);
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper<Integer> grouper = newGrouper(columnSelectorFactory, 100);

View File

@ -38,10 +38,14 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.context.DefaultResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.filter.AndDimFilter;
@ -900,6 +904,25 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
}
}
@Test
public void testScanQueryTimeout()
{
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.virtualColumns(EXPR_COLUMN)
.context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1))
.build();
ResponseContext responseContext = DefaultResponseContext.createEmpty();
responseContext.add(ResponseContext.Key.TIMEOUT_AT, System.currentTimeMillis());
try {
runner.run(QueryPlus.wrap(query), responseContext).toList();
}
catch (RuntimeException e) {
Assert.assertTrue(e instanceof QueryTimeoutException);
Assert.assertEquals("Query timeout", ((QueryTimeoutException) e).getErrorCode());
}
}
private List<List<Map<String, Object>>> toFullEvents(final String[]... valueSet)
{
return toEvents(

View File

@ -48,6 +48,7 @@ import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
@ -211,7 +212,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
{
final InputStreamHolder holder = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS);
if (holder == null) {
throw new RE("Query[%s] url[%s] timed out.", query.getId(), url);
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url));
}
final long currentQueuedByteCount = queuedByteCount.addAndGet(-holder.getLength());
@ -428,7 +429,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
if (timeLeft <= 0) {
String msg = StringUtils.format("Query[%s] url[%s] timed out.", query.getId(), url);
setupResponseReadFailure(msg, null);
throw new RE(msg);
throw new QueryTimeoutException(msg);
} else {
return timeLeft;
}
@ -451,7 +452,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
long timeLeft = timeoutAt - System.currentTimeMillis();
if (timeLeft <= 0) {
throw new RE("Query[%s] url[%s] timed out.", query.getId(), url);
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url));
}
future = httpClient.go(

View File

@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.ResourceLimitExceededException;
import javax.annotation.Nullable;
@ -112,9 +113,9 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
catch (IOException e) {
// check for timeout, a failure here might be related to a timeout, so lets just attribute it
if (checkTimeout()) {
TimeoutException timeoutException = timeoutQuery();
QueryTimeoutException timeoutException = timeoutQuery();
timeoutException.addSuppressed(e);
throw interruptQuery(timeoutException);
throw timeoutException;
} else {
throw interruptQuery(e);
}
@ -155,14 +156,14 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
try {
long timeLeftMillis = timeoutAt - System.currentTimeMillis();
if (checkTimeout(timeLeftMillis)) {
throw interruptQuery(timeoutQuery());
throw timeoutQuery();
}
InputStream is = hasTimeout ? future.get(timeLeftMillis, TimeUnit.MILLISECONDS) : future.get();
if (is != null) {
jp = objectMapper.getFactory().createParser(is);
} else if (checkTimeout()) {
throw interruptQuery(timeoutQuery());
throw timeoutQuery();
} else {
// if we haven't timed out completing the future, then this is the likely cause
throw interruptQuery(new ResourceLimitExceededException("url[%s] max bytes limit reached.", url));
@ -180,15 +181,18 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
);
}
}
catch (IOException | InterruptedException | ExecutionException | CancellationException | TimeoutException e) {
catch (IOException | InterruptedException | ExecutionException | CancellationException e) {
throw interruptQuery(e);
}
catch (TimeoutException e) {
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out!", queryId), host);
}
}
}
private TimeoutException timeoutQuery()
private QueryTimeoutException timeoutQuery()
{
return new TimeoutException(StringUtils.format("url[%s] timed out", url));
return new QueryTimeoutException(StringUtils.nonStrictFormat("url[%s] timed out", url), host);
}
private QueryInterruptedException interruptQuery(Exception cause)

View File

@ -40,6 +40,7 @@ import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.context.ResponseContext;
@ -331,7 +332,7 @@ public class QueryLifecycle
if (e != null) {
statsMap.put("exception", e.toString());
log.noStackTrace().warn(e, "Exception while processing queryId [%s]", baseQuery.getId());
if (e instanceof QueryInterruptedException) {
if (e instanceof QueryInterruptedException || e instanceof QueryTimeoutException) {
// Mimic behavior from QueryResource, where this code was originally taken from.
statsMap.put("interrupted", true);
statsMap.put("reason", e.toString());

View File

@ -44,6 +44,7 @@ import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryException;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.TruncatedResponseContextException;
@ -330,6 +331,11 @@ public class QueryResource implements QueryCountStatsProvider
queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1);
return ioReaderWriter.gotError(e);
}
catch (QueryTimeoutException timeout) {
interruptedQueryCount.incrementAndGet();
queryLifecycle.emitLogsAndMetrics(timeout, req.getRemoteAddr(), -1);
return ioReaderWriter.gotTimeout(timeout);
}
catch (QueryCapacityExceededException cap) {
failedQueryCount.incrementAndGet();
queryLifecycle.emitLogsAndMetrics(cap, req.getRemoteAddr(), -1);
@ -465,6 +471,17 @@ public class QueryResource implements QueryCountStatsProvider
.build();
}
Response gotTimeout(QueryTimeoutException e) throws IOException
{
return Response.status(QueryTimeoutException.STATUS_CODE)
.type(contentType)
.entity(
newOutputWriter(null, null, false)
.writeValueAsBytes(e)
)
.build();
}
Response gotLimited(QueryCapacityExceededException e) throws IOException
{
return Response.status(QueryCapacityExceededException.STATUS_CODE)

View File

@ -40,6 +40,7 @@ import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.ReflectionQueryToolChestWarehouse;
import org.apache.druid.query.Result;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
@ -93,8 +94,8 @@ public class DirectDruidClientTest
{
httpClient = EasyMock.createMock(HttpClient.class);
serverSelector = new ServerSelector(
dataSegment,
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
dataSegment,
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
);
client = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
@ -354,7 +355,7 @@ public class DirectDruidClientTest
in
);
QueryInterruptedException actualException = null;
QueryTimeoutException actualException = null;
try {
out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}"));
Thread.sleep(250);
@ -362,7 +363,7 @@ public class DirectDruidClientTest
out.close();
results.toList();
}
catch (QueryInterruptedException e) {
catch (QueryTimeoutException e) {
actualException = e;
}
Assert.assertNotNull(actualException);
@ -399,16 +400,16 @@ public class DirectDruidClientTest
Sequence results = client.run(QueryPlus.wrap(query));
QueryInterruptedException actualException = null;
QueryTimeoutException actualException = null;
try {
results.toList();
}
catch (QueryInterruptedException e) {
catch (QueryTimeoutException e) {
actualException = e;
}
Assert.assertNotNull(actualException);
Assert.assertEquals("Query timeout", actualException.getErrorCode());
Assert.assertEquals("Timeout waiting for task.", actualException.getMessage());
Assert.assertEquals(StringUtils.format("Query [%s] timed out!", queryId), actualException.getMessage());
Assert.assertEquals(hostName, actualException.getHost());
EasyMock.verify(httpClient);
}

View File

@ -41,6 +41,7 @@ import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.Result;
@ -629,6 +630,62 @@ public class QueryResourceTest
);
}
@Test
public void testQueryTimeoutException() throws Exception
{
final QuerySegmentWalker timeoutSegmentWalker = new QuerySegmentWalker()
{
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
throw new QueryTimeoutException();
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
return getQueryRunnerForIntervals(null, null);
}
};
final QueryResource timeoutQueryResource = new QueryResource(
new QueryLifecycleFactory(
WAREHOUSE,
timeoutSegmentWalker,
new DefaultGenericQueryMetricsFactory(),
new NoopServiceEmitter(),
testRequestLogger,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
),
JSON_MAPPER,
JSON_MAPPER,
queryScheduler,
new AuthConfig(),
null,
ResponseContextConfig.newConfig(true),
DRUID_NODE
);
expectPermissiveHappyPathAuth();
Response response = timeoutQueryResource.doPost(
new ByteArrayInputStream(SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)),
null /*pretty*/,
testServletRequest
);
Assert.assertNotNull(response);
Assert.assertEquals(QueryTimeoutException.STATUS_CODE, response.getStatus());
QueryTimeoutException ex;
try {
ex = JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryTimeoutException.class);
}
catch (IOException e) {
throw new RuntimeException(e);
}
Assert.assertEquals("Query Timed Out!", ex.getMessage());
Assert.assertEquals(QueryTimeoutException.ERROR_CODE, ex.getErrorCode());
}
@Test(timeout = 60_000L)
public void testSecuredCancelQuery() throws Exception
{

View File

@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.server.QueryStats;
import org.apache.druid.server.RequestLogLine;
import org.apache.druid.server.log.RequestLogger;
@ -340,7 +341,7 @@ public class SqlLifecycle
if (e != null) {
statsMap.put("exception", e.toString());
if (e instanceof QueryInterruptedException) {
if (e instanceof QueryInterruptedException || e instanceof QueryTimeoutException) {
statsMap.put("interrupted", true);
statsMap.put("reason", e.toString());
}

View File

@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.server.QueryCapacityExceededException;
import org.apache.druid.server.security.ForbiddenException;
@ -182,6 +183,10 @@ public class SqlResource
lifecycle.emitLogsAndMetrics(unsupported, remoteAddr, -1);
return Response.status(QueryUnsupportedException.STATUS_CODE).entity(jsonMapper.writeValueAsBytes(unsupported)).build();
}
catch (QueryTimeoutException timeout) {
lifecycle.emitLogsAndMetrics(timeout, remoteAddr, -1);
return Response.status(QueryTimeoutException.STATUS_CODE).entity(jsonMapper.writeValueAsBytes(timeout)).build();
}
catch (ForbiddenException e) {
throw e; // let ForbiddenExceptionMapper handle this
}

View File

@ -38,9 +38,11 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryException;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.server.QueryCapacityExceededException;
@ -801,6 +803,25 @@ public class SqlResourceTest extends CalciteTestBase
Assert.assertEquals(3, testRequestLogger.getSqlQueryLogs().size());
}
@Test
public void testQueryTimeoutException() throws Exception
{
Map<String, Object> queryContext = ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1);
final QueryException timeoutException = doPost(
new SqlQuery(
"SELECT CAST(__time AS DATE), dim1, dim2, dim3 FROM druid.foo GROUP by __time, dim1, dim2, dim3 ORDER BY dim2 DESC",
ResultFormat.OBJECT,
false,
queryContext,
null
)
).lhs;
Assert.assertNotNull(timeoutException);
Assert.assertEquals(timeoutException.getErrorCode(), QueryTimeoutException.ERROR_CODE);
Assert.assertEquals(timeoutException.getErrorClass(), QueryTimeoutException.class.getName());
}
@SuppressWarnings("unchecked")
private void checkSqlRequestLog(boolean success)
{