diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java index 7ab591a7d3d..8638b3917c9 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -25,6 +25,7 @@ import com.google.common.collect.Ordering; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; @@ -187,7 +188,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase { final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime(); if (hasTimeout && thisTimeoutNanos < 0) { - throw new RE(new TimeoutException("Sequence iterator timed out")); + throw new QueryTimeoutException("Sequence iterator timed out"); } if (currentBatch != null && !currentBatch.isTerminalResult() && !currentBatch.isDrained()) { @@ -202,7 +203,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase } } if (currentBatch == null) { - throw new RE(new TimeoutException("Sequence iterator timed out waiting for data")); + throw new QueryTimeoutException("Sequence iterator timed out waiting for data"); } if (cancellationGizmo.isCancelled()) { diff --git a/processing/src/main/java/org/apache/druid/query/QueryException.java b/core/src/main/java/org/apache/druid/query/QueryException.java similarity index 100% rename from processing/src/main/java/org/apache/druid/query/QueryException.java rename to core/src/main/java/org/apache/druid/query/QueryException.java diff --git a/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java b/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java new file mode 100644 index 00000000000..6eca438190f --- /dev/null +++ b/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; +import java.net.InetAddress; + +/** + * This exception is thrown when a query does not finish before the configured query timeout. + * {@link java.util.concurrent.TimeoutException} exceptions encountered during the lifecycle of a query + * are rethrown as this exception. + *

+ * As a {@link QueryException}, it is expected to be serialized to a json response, but will be mapped to + * {@link #STATUS_CODE} instead of the default HTTP 500 status. + */ + +public class QueryTimeoutException extends QueryException +{ + private static final String ERROR_CLASS = QueryTimeoutException.class.getName(); + public static final String ERROR_CODE = "Query timeout"; + public static final String ERROR_MESSAGE = "Query Timed Out!"; + public static final int STATUS_CODE = 504; + + @JsonCreator + public QueryTimeoutException( + @JsonProperty("error") @Nullable String errorCode, + @JsonProperty("errorMessage") String errorMessage, + @JsonProperty("errorClass") @Nullable String errorClass, + @JsonProperty("host") @Nullable String host + ) + { + super(errorCode, errorMessage, errorClass, host); + } + + public QueryTimeoutException() + { + super(ERROR_CODE, ERROR_MESSAGE, ERROR_CLASS, resolveHostname()); + } + + public QueryTimeoutException(String errorMessage) + { + super(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname()); + } + + public QueryTimeoutException(String errorMessage, String host) + { + super(ERROR_CODE, errorMessage, ERROR_CLASS, host); + } + + + private static String resolveHostname() + { + String host; + try { + host = InetAddress.getLocalHost().getCanonicalHostName(); + } + catch (Exception e) { + host = null; + } + return host; + } +} diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java index e459c38db89..4f9f9e2dac5 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java @@ -23,7 +23,7 @@ import com.google.common.collect.Ordering; import org.apache.druid.common.guava.CombiningSequence; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.logger.Logger; -import org.hamcrest.Matchers; +import org.apache.druid.query.QueryTimeoutException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -39,7 +39,6 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BinaryOperator; import java.util.function.Consumer; @@ -511,8 +510,7 @@ public class ParallelMergeCombiningSequenceTest input.add(nonBlockingSequence(someSize)); input.add(nonBlockingSequence(someSize)); input.add(blockingSequence(someSize, 400, 500, 1, 500, true)); - expectedException.expect(RuntimeException.class); - expectedException.expectCause(Matchers.instanceOf(TimeoutException.class)); + expectedException.expect(QueryTimeoutException.class); expectedException.expectMessage("Sequence iterator timed out waiting for data"); assertException( @@ -534,8 +532,7 @@ public class ParallelMergeCombiningSequenceTest input.add(nonBlockingSequence(someSize)); input.add(nonBlockingSequence(someSize)); - expectedException.expect(RuntimeException.class); - expectedException.expectCause(Matchers.instanceOf(TimeoutException.class)); + expectedException.expect(QueryTimeoutException.class); expectedException.expectMessage("Sequence iterator timed out"); assertException(input, 8, 64, 1000, 500); } diff --git a/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java b/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java new file mode 100644 index 00000000000..ab187a1fb42 --- /dev/null +++ b/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class QueryTimeoutExceptionTest +{ + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = new ObjectMapper(); + QueryTimeoutException timeoutException = mapper.readValue( + mapper.writeValueAsBytes(new QueryTimeoutException()), + QueryTimeoutException.class + ); + QueryTimeoutException timeoutExceptionWithMsg = mapper.readValue(mapper.writeValueAsBytes(new QueryTimeoutException( + "Another query timeout")), QueryTimeoutException.class); + + Assert.assertEquals( + "Query timeout", + timeoutException.getErrorCode() + ); + Assert.assertEquals( + "Query Timed Out!", + timeoutException.getMessage() + ); + Assert.assertEquals( + "Another query timeout", + timeoutExceptionWithMsg.getMessage() + ); + Assert.assertEquals( + "org.apache.druid.query.QueryTimeoutException", + timeoutExceptionWithMsg.getErrorClass() + ); + } + + @Test + public void testExceptionHost() + { + Assert.assertEquals( + "timeouthost", + new QueryTimeoutException("Timed out", "timeouthost").getHost() + ); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java index e30e9d1d8e5..6269493dfd1 100644 --- a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.MergeIterable; import org.apache.druid.java.util.common.guava.Sequence; @@ -132,6 +133,9 @@ public class ChainedExecutionQueryRunner implements QueryRunner catch (QueryInterruptedException e) { throw new RuntimeException(e); } + catch (QueryTimeoutException e) { + throw e; + } catch (Exception e) { log.noStackTrace().error(e, "Exception with one of the sequences!"); Throwables.propagateIfPossible(e); @@ -167,7 +171,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner catch (TimeoutException e) { log.warn("Query timeout, cancelling pending results for query id [%s]", query.getId()); GuavaUtils.cancelAll(true, future, futures); - throw new QueryInterruptedException(e); + throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId())); } catch (ExecutionException e) { GuavaUtils.cancelAll(true, future, futures); diff --git a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java index 8845f2a3a36..85a96014ef7 100644 --- a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java @@ -35,6 +35,7 @@ import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.data.input.Row; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -201,7 +202,7 @@ public class GroupByMergedQueryRunner implements QueryRunner closeOnFailure.close(); log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); GuavaUtils.cancelAll(true, future, futures); - throw new QueryInterruptedException(e); + throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId())); } catch (ExecutionException e) { GuavaUtils.cancelAll(true, future, futures); diff --git a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java index 206e1145333..b174c23857c 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java +++ b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java @@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nullable; import java.util.concurrent.CancellationException; -import java.util.concurrent.TimeoutException; /** * Exception representing a failed query. The name "QueryInterruptedException" is a misnomer; this is actually @@ -33,7 +32,7 @@ import java.util.concurrent.TimeoutException; * * Fields: * - "errorCode" is a well-defined errorCode code taken from a specific list (see the static constants). "Unknown exception" - * represents all wrapped exceptions other than interrupt, timeout, cancellation, resource limit exceeded, unauthorized + * represents all wrapped exceptions other than interrupt, cancellation, resource limit exceeded, unauthorized * request, and unsupported operation. * - "errorMessage" is the toString of the wrapped exception * - "errorClass" is the class of the wrapped exception @@ -45,7 +44,6 @@ import java.util.concurrent.TimeoutException; public class QueryInterruptedException extends QueryException { public static final String QUERY_INTERRUPTED = "Query interrupted"; - public static final String QUERY_TIMEOUT = "Query timeout"; public static final String QUERY_CANCELLED = "Query cancelled"; public static final String RESOURCE_LIMIT_EXCEEDED = "Resource limit exceeded"; public static final String UNAUTHORIZED = "Unauthorized request."; @@ -100,8 +98,6 @@ public class QueryInterruptedException extends QueryException return QUERY_INTERRUPTED; } else if (e instanceof CancellationException) { return QUERY_CANCELLED; - } else if (e instanceof TimeoutException) { - return QUERY_TIMEOUT; } else if (e instanceof ResourceLimitExceededException) { return RESOURCE_LIMIT_EXCEEDED; } else if (e instanceof UnsupportedOperationException) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index 73c1b64397e..980b34ef4d2 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.query.AbstractPrioritizedCallable; import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; @@ -361,10 +362,14 @@ public class ConcurrentGrouper implements Grouper final long timeout = queryTimeoutAt - System.currentTimeMillis(); return hasQueryTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get(); } - catch (InterruptedException | TimeoutException | CancellationException e) { + catch (InterruptedException | CancellationException e) { GuavaUtils.cancelAll(true, future, futures); throw new QueryInterruptedException(e); } + catch (TimeoutException e) { + GuavaUtils.cancelAll(true, future, futures); + throw new QueryTimeoutException(); + } catch (ExecutionException e) { GuavaUtils.cancelAll(true, future, futures); throw new RuntimeException(e.getCause()); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 6d0563cdf51..f9cd21b9c04 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -49,6 +49,7 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.context.ResponseContext; @@ -245,7 +246,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner return input.run(queryPlusForRunners, responseContext) .accumulate(AggregateResult.ok(), accumulator); } - catch (QueryInterruptedException e) { + catch (QueryInterruptedException | QueryTimeoutException e) { throw e; } catch (Exception e) { @@ -321,16 +322,19 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner if (hasTimeout) { final long timeout = timeoutAt - System.currentTimeMillis(); if (timeout <= 0) { - throw new TimeoutException(); + throw new QueryTimeoutException(); } if ((mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers, timeout)).isEmpty()) { - throw new TimeoutException("Cannot acquire enough merge buffers"); + throw new QueryTimeoutException("Cannot acquire enough merge buffers"); } } else { mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers); } return mergeBufferHolder; } + catch (QueryTimeoutException e) { + throw e; + } catch (Exception e) { throw new QueryInterruptedException(e); } @@ -350,7 +354,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner } if (hasTimeout && timeout <= 0) { - throw new TimeoutException(); + throw new QueryTimeoutException(); } final List results = hasTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get(); @@ -374,7 +378,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner catch (TimeoutException e) { log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); GuavaUtils.cancelAll(true, future, futures); - throw new QueryInterruptedException(e); + throw new QueryTimeoutException(); } catch (ExecutionException e) { GuavaUtils.cancelAll(true, future, futures); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouper.java index a5ca158d9e2..573ec2d21a8 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouper.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.segment.ColumnSelectorFactory; @@ -33,7 +34,6 @@ import org.apache.druid.segment.ColumnSelectorFactory; import java.nio.ByteBuffer; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * A streaming grouper which can aggregate sorted inputs. This grouper can aggregate while its iterator is being @@ -302,7 +302,7 @@ public class StreamingMergeSortedGrouper implements Grouper // The below condition is checked in a while loop instead of using a lock to avoid frequent thread park. while ((nextReadIndex == -1 || nextReadIndex == 0) && !Thread.currentThread().isInterrupted()) { if (timeoutNs <= 0L) { - throw new RuntimeException(new TimeoutException()); + throw new QueryTimeoutException(); } // Thread.yield() should not be called from the very beginning if (spinTimeoutNs <= 0L) { @@ -321,7 +321,7 @@ public class StreamingMergeSortedGrouper implements Grouper // The below condition is checked in a while loop instead of using a lock to avoid frequent thread park. while ((nextWriteIndex == nextReadIndex) && !Thread.currentThread().isInterrupted()) { if (timeoutNs <= 0L) { - throw new RuntimeException(new TimeoutException()); + throw new QueryTimeoutException(); } // Thread.yield() should not be called from the very beginning if (spinTimeoutNs <= 0L) { @@ -470,7 +470,7 @@ public class StreamingMergeSortedGrouper implements Grouper while ((curWriteIndex == -1 || target == curWriteIndex) && !finished && !Thread.currentThread().isInterrupted()) { if (timeoutNs <= 0L) { - throw new RuntimeException(new TimeoutException()); + throw new QueryTimeoutException(); } // Thread.yield() should not be called from the very beginning if (spinTimeoutNs <= 0L) { diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 5e2ba0e54a1..24980fea8dd 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -37,6 +38,7 @@ import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -237,7 +239,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory= timeoutAt) { - throw new QueryInterruptedException(new TimeoutException()); + throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId())); } final long lastOffset = offset; final Object events; diff --git a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java index b2385044b56..cf822927fc6 100644 --- a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java @@ -269,14 +269,14 @@ public class ChainedExecutionQueryRunnerTest ListenableFuture future = capturedFuture.getValue(); // wait for query to time out - QueryInterruptedException cause = null; + QueryTimeoutException cause = null; try { resultFuture.get(); } catch (ExecutionException e) { - Assert.assertTrue(e.getCause() instanceof QueryInterruptedException); - Assert.assertEquals("Query timeout", ((QueryInterruptedException) e.getCause()).getErrorCode()); - cause = (QueryInterruptedException) e.getCause(); + Assert.assertTrue(e.getCause() instanceof QueryTimeoutException); + Assert.assertEquals("Query timeout", ((QueryTimeoutException) e.getCause()).getErrorCode()); + cause = (QueryTimeoutException) e.getCause(); } queriesInterrupted.await(); Assert.assertNotNull(cause); diff --git a/processing/src/test/java/org/apache/druid/query/QueryInterruptedExceptionTest.java b/processing/src/test/java/org/apache/druid/query/QueryInterruptedExceptionTest.java index 11ec31aebe3..5116a4cafdd 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryInterruptedExceptionTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryInterruptedExceptionTest.java @@ -27,7 +27,6 @@ import org.junit.Assert; import org.junit.Test; import java.util.concurrent.CancellationException; -import java.util.concurrent.TimeoutException; public class QueryInterruptedExceptionTest { @@ -42,7 +41,6 @@ public class QueryInterruptedExceptionTest ); Assert.assertEquals("Query cancelled", new QueryInterruptedException(new CancellationException()).getErrorCode()); Assert.assertEquals("Query interrupted", new QueryInterruptedException(new InterruptedException()).getErrorCode()); - Assert.assertEquals("Query timeout", new QueryInterruptedException(new TimeoutException()).getErrorCode()); Assert.assertEquals("Unsupported operation", new QueryInterruptedException(new UOE("Unsupported")).getErrorCode()); Assert.assertEquals("Unknown exception", new QueryInterruptedException(null).getErrorCode()); Assert.assertEquals("Unknown exception", new QueryInterruptedException(new ISE("Something bad!")).getErrorCode()); @@ -71,10 +69,6 @@ public class QueryInterruptedExceptionTest null, new QueryInterruptedException(new InterruptedException()).getMessage() ); - Assert.assertEquals( - null, - new QueryInterruptedException(new TimeoutException()).getMessage() - ); Assert.assertEquals( null, new QueryInterruptedException(null).getMessage() @@ -108,10 +102,6 @@ public class QueryInterruptedExceptionTest "java.lang.InterruptedException", new QueryInterruptedException(new InterruptedException()).getErrorClass() ); - Assert.assertEquals( - "java.util.concurrent.TimeoutException", - new QueryInterruptedException(new TimeoutException()).getErrorClass() - ); Assert.assertEquals( "org.apache.druid.query.ResourceLimitExceededException", new QueryInterruptedException(new ResourceLimitExceededException("too many!")).getErrorClass() @@ -162,10 +152,6 @@ public class QueryInterruptedExceptionTest "java.lang.InterruptedException", roundTrip(new QueryInterruptedException(new InterruptedException())).getErrorClass() ); - Assert.assertEquals( - "java.util.concurrent.TimeoutException", - roundTrip(new QueryInterruptedException(new TimeoutException())).getErrorClass() - ); Assert.assertEquals( null, roundTrip(new QueryInterruptedException(null)).getErrorClass() diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java index 70806e6e55d..11238f1b878 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -33,16 +33,15 @@ import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.InsufficientResourcesException; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; -import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; -import org.hamcrest.CoreMatchers; import org.junit.AfterClass; import org.junit.Rule; import org.junit.Test; @@ -56,7 +55,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeoutException; @RunWith(Parameterized.class) public class GroupByQueryRunnerFailureTest @@ -182,8 +180,7 @@ public class GroupByQueryRunnerFailureTest @Test(timeout = 60_000L) public void testNotEnoughMergeBuffersOnQueryable() { - expectedException.expect(QueryInterruptedException.class); - expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class)); + expectedException.expect(QueryTimeoutException.class); expectedException.expectMessage("Cannot acquire enough merge buffers"); final GroupByQuery query = GroupByQuery @@ -283,8 +280,7 @@ public class GroupByQueryRunnerFailureTest @Test(timeout = 60_000L) public void testTimeoutExceptionOnQueryable() { - expectedException.expect(QueryInterruptedException.class); - expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class)); + expectedException.expect(QueryTimeoutException.class); final GroupByQuery query = GroupByQuery .builder() diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java index cd94d816600..a93237e5afe 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.dimension.DimensionSpec; @@ -170,6 +171,63 @@ public class ConcurrentGrouperTest grouper.close(); } + @Test + public void testGrouperTimeout() throws Exception + { + final ConcurrentGrouper grouper = new ConcurrentGrouper<>( + bufferSupplier, + TEST_RESOURCE_HOLDER, + KEY_SERDE_FACTORY, + KEY_SERDE_FACTORY, + NULL_FACTORY, + new AggregatorFactory[] {new CountAggregatorFactory("cnt")}, + 1024, + 0.7f, + 1, + new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024), + new DefaultObjectMapper(), + 8, + null, + false, + MoreExecutors.listeningDecorator(SERVICE), + 0, + true, + 1, + 4, + 8 + ); + grouper.init(); + + final int numRows = 1000; + + Future[] futures = new Future[8]; + + for (int i = 0; i < 8; i++) { + futures[i] = SERVICE.submit(new Runnable() + { + @Override + public void run() + { + for (long i = 0; i < numRows; i++) { + grouper.aggregate(i); + } + } + }); + } + + for (Future eachFuture : futures) { + eachFuture.get(); + } + try { + grouper.iterator(true); + } + catch (RuntimeException e) { + Assert.assertTrue(e instanceof QueryTimeoutException); + Assert.assertEquals("Query timeout", ((QueryTimeoutException) e).getErrorCode()); + } + grouper.close(); + } + static class TestResourceHolder extends ReferenceCountingResourceHolder { private boolean taken; diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouperTest.java index 15712510db1..831db4da1ed 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouperTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouperTest.java @@ -28,11 +28,12 @@ import com.google.common.primitives.Ints; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.MapBasedRow; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.groupby.epinephelinae.Grouper.Entry; -import org.hamcrest.CoreMatchers; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -45,9 +46,8 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.TimeoutException; -public class StreamingMergeSortedGrouperTest +public class StreamingMergeSortedGrouperTest extends InitializedNullHandlingTest { @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -161,8 +161,7 @@ public class StreamingMergeSortedGrouperTest @Test public void testTimeout() { - expectedException.expect(RuntimeException.class); - expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class)); + expectedException.expect(QueryTimeoutException.class); final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); final StreamingMergeSortedGrouper grouper = newGrouper(columnSelectorFactory, 100); diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java index 739420980ad..a08d23b216f 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java @@ -38,10 +38,14 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.context.DefaultResponseContext; +import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.extraction.MapLookupExtractor; import org.apache.druid.query.filter.AndDimFilter; @@ -900,6 +904,25 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest } } + @Test + public void testScanQueryTimeout() + { + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .virtualColumns(EXPR_COLUMN) + .context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1)) + .build(); + ResponseContext responseContext = DefaultResponseContext.createEmpty(); + responseContext.add(ResponseContext.Key.TIMEOUT_AT, System.currentTimeMillis()); + try { + runner.run(QueryPlus.wrap(query), responseContext).toList(); + } + catch (RuntimeException e) { + Assert.assertTrue(e instanceof QueryTimeoutException); + Assert.assertEquals("Query timeout", ((QueryTimeoutException) e).getErrorCode()); + } + } + private List>> toFullEvents(final String[]... valueSet) { return toEvents( diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index fa6c45e9af3..8bc5d78f4c8 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -48,6 +48,7 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.QueryWatcher; @@ -211,7 +212,7 @@ public class DirectDruidClient implements QueryRunner { final InputStreamHolder holder = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS); if (holder == null) { - throw new RE("Query[%s] url[%s] timed out.", query.getId(), url); + throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url)); } final long currentQueuedByteCount = queuedByteCount.addAndGet(-holder.getLength()); @@ -428,7 +429,7 @@ public class DirectDruidClient implements QueryRunner if (timeLeft <= 0) { String msg = StringUtils.format("Query[%s] url[%s] timed out.", query.getId(), url); setupResponseReadFailure(msg, null); - throw new RE(msg); + throw new QueryTimeoutException(msg); } else { return timeLeft; } @@ -451,7 +452,7 @@ public class DirectDruidClient implements QueryRunner long timeLeft = timeoutAt - System.currentTimeMillis(); if (timeLeft <= 0) { - throw new RE("Query[%s] url[%s] timed out.", query.getId(), url); + throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url)); } future = httpClient.go( diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java index 5fac43bf66f..7ec4f150301 100644 --- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java +++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java @@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.Query; import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.ResourceLimitExceededException; import javax.annotation.Nullable; @@ -112,9 +113,9 @@ public class JsonParserIterator implements Iterator, Closeable catch (IOException e) { // check for timeout, a failure here might be related to a timeout, so lets just attribute it if (checkTimeout()) { - TimeoutException timeoutException = timeoutQuery(); + QueryTimeoutException timeoutException = timeoutQuery(); timeoutException.addSuppressed(e); - throw interruptQuery(timeoutException); + throw timeoutException; } else { throw interruptQuery(e); } @@ -155,14 +156,14 @@ public class JsonParserIterator implements Iterator, Closeable try { long timeLeftMillis = timeoutAt - System.currentTimeMillis(); if (checkTimeout(timeLeftMillis)) { - throw interruptQuery(timeoutQuery()); + throw timeoutQuery(); } InputStream is = hasTimeout ? future.get(timeLeftMillis, TimeUnit.MILLISECONDS) : future.get(); if (is != null) { jp = objectMapper.getFactory().createParser(is); } else if (checkTimeout()) { - throw interruptQuery(timeoutQuery()); + throw timeoutQuery(); } else { // if we haven't timed out completing the future, then this is the likely cause throw interruptQuery(new ResourceLimitExceededException("url[%s] max bytes limit reached.", url)); @@ -180,15 +181,18 @@ public class JsonParserIterator implements Iterator, Closeable ); } } - catch (IOException | InterruptedException | ExecutionException | CancellationException | TimeoutException e) { + catch (IOException | InterruptedException | ExecutionException | CancellationException e) { throw interruptQuery(e); } + catch (TimeoutException e) { + throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out!", queryId), host); + } } } - private TimeoutException timeoutQuery() + private QueryTimeoutException timeoutQuery() { - return new TimeoutException(StringUtils.format("url[%s] timed out", url)); + return new QueryTimeoutException(StringUtils.nonStrictFormat("url[%s] timed out", url), host); } private QueryInterruptedException interruptQuery(Exception cause) diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java index cf6cde5081c..a82bbf8c4e9 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java +++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java @@ -40,6 +40,7 @@ import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QuerySegmentWalker; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.context.ResponseContext; @@ -331,7 +332,7 @@ public class QueryLifecycle if (e != null) { statsMap.put("exception", e.toString()); log.noStackTrace().warn(e, "Exception while processing queryId [%s]", baseQuery.getId()); - if (e instanceof QueryInterruptedException) { + if (e instanceof QueryInterruptedException || e instanceof QueryTimeoutException) { // Mimic behavior from QueryResource, where this code was originally taken from. statsMap.put("interrupted", true); statsMap.put("reason", e.toString()); diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index cb8746c2d88..27ac513bd6e 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -44,6 +44,7 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryException; import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.query.TruncatedResponseContextException; @@ -330,6 +331,11 @@ public class QueryResource implements QueryCountStatsProvider queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1); return ioReaderWriter.gotError(e); } + catch (QueryTimeoutException timeout) { + interruptedQueryCount.incrementAndGet(); + queryLifecycle.emitLogsAndMetrics(timeout, req.getRemoteAddr(), -1); + return ioReaderWriter.gotTimeout(timeout); + } catch (QueryCapacityExceededException cap) { failedQueryCount.incrementAndGet(); queryLifecycle.emitLogsAndMetrics(cap, req.getRemoteAddr(), -1); @@ -465,6 +471,17 @@ public class QueryResource implements QueryCountStatsProvider .build(); } + Response gotTimeout(QueryTimeoutException e) throws IOException + { + return Response.status(QueryTimeoutException.STATUS_CODE) + .type(contentType) + .entity( + newOutputWriter(null, null, false) + .writeValueAsBytes(e) + ) + .build(); + } + Response gotLimited(QueryCapacityExceededException e) throws IOException { return Response.status(QueryCapacityExceededException.STATUS_CODE) diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index 6c50249693f..c38bcd4ce9a 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -40,6 +40,7 @@ import org.apache.druid.query.Druids; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.ReflectionQueryToolChestWarehouse; import org.apache.druid.query.Result; import org.apache.druid.query.timeboundary.TimeBoundaryQuery; @@ -93,8 +94,8 @@ public class DirectDruidClientTest { httpClient = EasyMock.createMock(HttpClient.class); serverSelector = new ServerSelector( - dataSegment, - new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) + dataSegment, + new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) ); client = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), @@ -354,7 +355,7 @@ public class DirectDruidClientTest in ); - QueryInterruptedException actualException = null; + QueryTimeoutException actualException = null; try { out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}")); Thread.sleep(250); @@ -362,7 +363,7 @@ public class DirectDruidClientTest out.close(); results.toList(); } - catch (QueryInterruptedException e) { + catch (QueryTimeoutException e) { actualException = e; } Assert.assertNotNull(actualException); @@ -399,16 +400,16 @@ public class DirectDruidClientTest Sequence results = client.run(QueryPlus.wrap(query)); - QueryInterruptedException actualException = null; + QueryTimeoutException actualException = null; try { results.toList(); } - catch (QueryInterruptedException e) { + catch (QueryTimeoutException e) { actualException = e; } Assert.assertNotNull(actualException); Assert.assertEquals("Query timeout", actualException.getErrorCode()); - Assert.assertEquals("Timeout waiting for task.", actualException.getMessage()); + Assert.assertEquals(StringUtils.format("Query [%s] timed out!", queryId), actualException.getMessage()); Assert.assertEquals(hostName, actualException.getHost()); EasyMock.verify(httpClient); } diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index 0a24b0edb8a..5c501077ab2 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -41,6 +41,7 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QuerySegmentWalker; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.query.Result; @@ -629,6 +630,62 @@ public class QueryResourceTest ); } + @Test + public void testQueryTimeoutException() throws Exception + { + final QuerySegmentWalker timeoutSegmentWalker = new QuerySegmentWalker() + { + @Override + public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) + { + throw new QueryTimeoutException(); + } + + @Override + public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) + { + return getQueryRunnerForIntervals(null, null); + } + }; + + final QueryResource timeoutQueryResource = new QueryResource( + new QueryLifecycleFactory( + WAREHOUSE, + timeoutSegmentWalker, + new DefaultGenericQueryMetricsFactory(), + new NoopServiceEmitter(), + testRequestLogger, + new AuthConfig(), + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())) + ), + JSON_MAPPER, + JSON_MAPPER, + queryScheduler, + new AuthConfig(), + null, + ResponseContextConfig.newConfig(true), + DRUID_NODE + ); + expectPermissiveHappyPathAuth(); + Response response = timeoutQueryResource.doPost( + new ByteArrayInputStream(SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)), + null /*pretty*/, + testServletRequest + ); + Assert.assertNotNull(response); + Assert.assertEquals(QueryTimeoutException.STATUS_CODE, response.getStatus()); + QueryTimeoutException ex; + try { + ex = JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryTimeoutException.class); + } + catch (IOException e) { + throw new RuntimeException(e); + } + Assert.assertEquals("Query Timed Out!", ex.getMessage()); + Assert.assertEquals(QueryTimeoutException.ERROR_CODE, ex.getErrorCode()); + } + @Test(timeout = 60_000L) public void testSecuredCancelQuery() throws Exception { diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java index 4c9135941ca..9453d71c514 100644 --- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java +++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java @@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.server.QueryStats; import org.apache.druid.server.RequestLogLine; import org.apache.druid.server.log.RequestLogger; @@ -340,7 +341,7 @@ public class SqlLifecycle if (e != null) { statsMap.put("exception", e.toString()); - if (e instanceof QueryInterruptedException) { + if (e instanceof QueryInterruptedException || e instanceof QueryTimeoutException) { statsMap.put("interrupted", true); statsMap.put("reason", e.toString()); } diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java index 1c950c94819..03173eea3ba 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.server.QueryCapacityExceededException; import org.apache.druid.server.security.ForbiddenException; @@ -182,6 +183,10 @@ public class SqlResource lifecycle.emitLogsAndMetrics(unsupported, remoteAddr, -1); return Response.status(QueryUnsupportedException.STATUS_CODE).entity(jsonMapper.writeValueAsBytes(unsupported)).build(); } + catch (QueryTimeoutException timeout) { + lifecycle.emitLogsAndMetrics(timeout, remoteAddr, -1); + return Response.status(QueryTimeoutException.STATUS_CODE).entity(jsonMapper.writeValueAsBytes(timeout)).build(); + } catch (ForbiddenException e) { throw e; // let ForbiddenExceptionMapper handle this } diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java index cc3bce3f6b2..123277e491e 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java @@ -38,9 +38,11 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryException; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.server.QueryCapacityExceededException; @@ -801,6 +803,25 @@ public class SqlResourceTest extends CalciteTestBase Assert.assertEquals(3, testRequestLogger.getSqlQueryLogs().size()); } + @Test + public void testQueryTimeoutException() throws Exception + { + Map queryContext = ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1); + final QueryException timeoutException = doPost( + new SqlQuery( + "SELECT CAST(__time AS DATE), dim1, dim2, dim3 FROM druid.foo GROUP by __time, dim1, dim2, dim3 ORDER BY dim2 DESC", + ResultFormat.OBJECT, + false, + queryContext, + null + ) + ).lhs; + Assert.assertNotNull(timeoutException); + Assert.assertEquals(timeoutException.getErrorCode(), QueryTimeoutException.ERROR_CODE); + Assert.assertEquals(timeoutException.getErrorClass(), QueryTimeoutException.class.getName()); + + } + @SuppressWarnings("unchecked") private void checkSqlRequestLog(boolean success) {