+ * The Object Model that QueryException follows is a little non-intuitive as the primary way that a QueryException is
+ * generated is through a child class. However, those child classes are *not* equivalent to a QueryException, instead
+ * they act as a Factory of QueryException objects. This can be seen in two different places.
+ *
+ * 1. When sanitize() is called, the response is a QueryException without any indication of which original exception
+ * occurred.
+ * 2. When these objects get serialized across the wire the recipient deserializes a QueryException. The client is
+ * never expected, and fundamentally is not allowed to, ever deserialize a child class of QueryException.
+ *
+ * For this reason, QueryException must contain all potential state that any of its child classes could ever want to
+ * push across the wire. Additionally, any catch clauses expecting one of the child Exceptions must know that it is
+ * running inside of code where the exception has not traveled across the wire. If there is a chance that the
+ * exception could have been serialized across the wire, the code must catch a QueryException and check the errorCode
+ * instead.
+ *
+ * As a corollary, adding new state or adjusting the logic of this class must always be done in a backwards-compatible
+ * fashion across all child classes of QueryException.
+ *
+ * If there is any need to do different logic based on the type of error that has happened, the only reliable method
+ * of discerning the type of the error is to look at the errorCode String. Because these Strings are considered part
+ * of the API, they are not allowed to change and must maintain their same semantics. The known errorCode Strings
+ * are pulled together as public static fields on this class in order to make it more clear what options exist.
+ *
* QueryResource and SqlResource are expected to emit the JSON form of this object when errors happen.
*/
public class QueryException extends RuntimeException implements SanitizableException
{
+ /**
+ * Error codes
+ */
+ public static final String JSON_PARSE_ERROR_CODE = "Json parse failed";
+ public static final String BAD_QUERY_CONTEXT_ERROR_CODE = "Query context parse failed";
+ public static final String QUERY_CAPACITY_EXCEEDED_ERROR_CODE = "Query capacity exceeded";
+ public static final String QUERY_INTERRUPTED_ERROR_CODE = "Query interrupted";
+ // Note: the proper spelling is with a single "l", but the version with
+ // two "l"s is documented, we can't change the text of the message.
+ public static final String QUERY_CANCELED_ERROR_CODE = "Query cancelled";
+ public static final String UNAUTHORIZED_ERROR_CODE = "Unauthorized request";
+ public static final String UNSUPPORTED_OPERATION_ERROR_CODE = "Unsupported operation";
+ public static final String TRUNCATED_RESPONSE_CONTEXT_ERROR_CODE = "Truncated response context";
+ public static final String UNKNOWN_EXCEPTION_ERROR_CODE = "Unknown exception";
+ public static final String QUERY_TIMEOUT_ERROR_CODE = "Query timeout";
+ public static final String QUERY_UNSUPPORTED_ERROR_CODE = "Unsupported query";
+ public static final String RESOURCE_LIMIT_EXCEEDED_ERROR_CODE = "Resource limit exceeded";
+ public static final String SQL_PARSE_FAILED_ERROR_CODE = "SQL parse failed";
+ public static final String PLAN_VALIDATION_FAILED_ERROR_CODE = "Plan validation failed";
+ public static final String SQL_QUERY_UNSUPPORTED_ERROR_CODE = "SQL query is unsupported";
+
+ public enum FailType
+ {
+ USER_ERROR(400),
+ UNAUTHORIZED(401),
+ CAPACITY_EXCEEDED(429),
+ UNKNOWN(500),
+ CANCELED(500),
+ QUERY_RUNTIME_FAILURE(500),
+ UNSUPPORTED(501),
+ TIMEOUT(504);
+
+ private final int expectedStatus;
+
+ FailType(int expectedStatus)
+ {
+ this.expectedStatus = expectedStatus;
+ }
+
+ public int getExpectedStatus()
+ {
+ return expectedStatus;
+ }
+ }
+
+ public static FailType fromErrorCode(String errorCode)
+ {
+ if (errorCode == null) {
+ return FailType.UNKNOWN;
+ }
+
+ switch (errorCode) {
+ case QUERY_CANCELED_ERROR_CODE:
+ return FailType.CANCELED;
+
+ // These error codes are generally expected to come from a QueryInterruptedException
+ case QUERY_INTERRUPTED_ERROR_CODE:
+ case UNSUPPORTED_OPERATION_ERROR_CODE:
+ case UNKNOWN_EXCEPTION_ERROR_CODE:
+ case TRUNCATED_RESPONSE_CONTEXT_ERROR_CODE:
+ return FailType.QUERY_RUNTIME_FAILURE;
+ case UNAUTHORIZED_ERROR_CODE:
+ return FailType.UNAUTHORIZED;
+
+ case QUERY_CAPACITY_EXCEEDED_ERROR_CODE:
+ return FailType.CAPACITY_EXCEEDED;
+ case QUERY_TIMEOUT_ERROR_CODE:
+ return FailType.TIMEOUT;
+
+ // These error codes are expected to come from BadQueryExceptions
+ case JSON_PARSE_ERROR_CODE:
+ case BAD_QUERY_CONTEXT_ERROR_CODE:
+ case RESOURCE_LIMIT_EXCEEDED_ERROR_CODE:
+ // And these ones from the SqlPlanningException which are also BadQueryExceptions
+ case SQL_PARSE_FAILED_ERROR_CODE:
+ case PLAN_VALIDATION_FAILED_ERROR_CODE:
+ case SQL_QUERY_UNSUPPORTED_ERROR_CODE:
+ return FailType.USER_ERROR;
+ case QUERY_UNSUPPORTED_ERROR_CODE:
+ return FailType.UNSUPPORTED;
+ default:
+ return FailType.UNKNOWN;
+ }
+ }
+
+ /**
+ * Implementation
+ */
private final String errorCode;
private final String errorClass;
private final String host;
@@ -48,7 +158,6 @@ public class QueryException extends RuntimeException implements SanitizableExcep
this.host = host;
}
- @VisibleForTesting
@JsonCreator
public QueryException(
@JsonProperty("error") @Nullable String errorCode,
@@ -105,4 +214,9 @@ public class QueryException extends RuntimeException implements SanitizableExcep
{
return new QueryException(errorCode, errorMessageTransformFunction.apply(getMessage()), null, null);
}
+
+ public FailType getFailType()
+ {
+ return fromErrorCode(errorCode);
+ }
}
diff --git a/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java b/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java
index d3626e9c5e6..7bd4924000a 100644
--- a/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java
+++ b/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java
@@ -36,7 +36,6 @@ import javax.annotation.Nullable;
public class QueryTimeoutException extends QueryException
{
private static final String ERROR_CLASS = QueryTimeoutException.class.getName();
- public static final String ERROR_CODE = "Query timeout";
public static final String ERROR_MESSAGE = "Query Timed Out!";
public static final int STATUS_CODE = 504;
@@ -53,16 +52,16 @@ public class QueryTimeoutException extends QueryException
public QueryTimeoutException()
{
- super(ERROR_CODE, ERROR_MESSAGE, ERROR_CLASS, resolveHostname());
+ super(QUERY_TIMEOUT_ERROR_CODE, ERROR_MESSAGE, ERROR_CLASS, resolveHostname());
}
public QueryTimeoutException(String errorMessage)
{
- super(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
+ super(QUERY_TIMEOUT_ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
}
public QueryTimeoutException(String errorMessage, String host)
{
- super(ERROR_CODE, errorMessage, ERROR_CLASS, host);
+ super(QUERY_TIMEOUT_ERROR_CODE, errorMessage, ERROR_CLASS, host);
}
}
diff --git a/core/src/test/java/org/apache/druid/query/QueryExceptionTest.java b/core/src/test/java/org/apache/druid/query/QueryExceptionTest.java
index 51ff763762c..446e94b9667 100644
--- a/core/src/test/java/org/apache/druid/query/QueryExceptionTest.java
+++ b/core/src/test/java/org/apache/druid/query/QueryExceptionTest.java
@@ -19,17 +19,12 @@
package org.apache.druid.query;
+import org.apache.druid.query.QueryException.FailType;
import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
-import java.util.function.Function;
+import java.util.concurrent.atomic.AtomicLong;
-@RunWith(MockitoJUnitRunner.class)
public class QueryExceptionTest
{
private static final String ERROR_CODE = "error code";
@@ -38,36 +33,72 @@ public class QueryExceptionTest
private static final String ERROR_MESSAGE_ORIGINAL = "aaaa";
private static final String ERROR_MESSAGE_TRANSFORMED = "bbbb";
- @Mock
- private Function trasformFunction;
-
@Test
public void testSanitizeWithTransformFunctionReturningNull()
{
- Mockito.when(trasformFunction.apply(ArgumentMatchers.eq(ERROR_MESSAGE_ORIGINAL))).thenReturn(null);
QueryException queryException = new QueryException(ERROR_CODE, ERROR_MESSAGE_ORIGINAL, ERROR_CLASS, HOST);
- QueryException actual = queryException.sanitize(trasformFunction);
+
+ AtomicLong callCount = new AtomicLong(0);
+ QueryException actual = queryException.sanitize(s -> {
+ callCount.incrementAndGet();
+ Assert.assertEquals(ERROR_MESSAGE_ORIGINAL, s);
+ return null;
+ });
+
Assert.assertNotNull(actual);
Assert.assertEquals(actual.getErrorCode(), ERROR_CODE);
Assert.assertNull(actual.getMessage());
Assert.assertNull(actual.getHost());
Assert.assertNull(actual.getErrorClass());
- Mockito.verify(trasformFunction).apply(ArgumentMatchers.eq(ERROR_MESSAGE_ORIGINAL));
- Mockito.verifyNoMoreInteractions(trasformFunction);
+ Assert.assertEquals(1, callCount.get());
}
@Test
public void testSanitizeWithTransformFunctionReturningNewString()
{
- Mockito.when(trasformFunction.apply(ArgumentMatchers.eq(ERROR_MESSAGE_ORIGINAL))).thenReturn(ERROR_MESSAGE_TRANSFORMED);
QueryException queryException = new QueryException(ERROR_CODE, ERROR_MESSAGE_ORIGINAL, ERROR_CLASS, HOST);
- QueryException actual = queryException.sanitize(trasformFunction);
+
+ AtomicLong callCount = new AtomicLong(0);
+ QueryException actual = queryException.sanitize(s -> {
+ callCount.incrementAndGet();
+ Assert.assertEquals(ERROR_MESSAGE_ORIGINAL, s);
+ return ERROR_MESSAGE_TRANSFORMED;
+ });
+
Assert.assertNotNull(actual);
Assert.assertEquals(actual.getErrorCode(), ERROR_CODE);
Assert.assertEquals(actual.getMessage(), ERROR_MESSAGE_TRANSFORMED);
Assert.assertNull(actual.getHost());
Assert.assertNull(actual.getErrorClass());
- Mockito.verify(trasformFunction).apply(ArgumentMatchers.eq(ERROR_MESSAGE_ORIGINAL));
- Mockito.verifyNoMoreInteractions(trasformFunction);
+ Assert.assertEquals(1, callCount.get());
+ }
+
+ @Test
+ public void testSanity()
+ {
+ expectFailTypeForCode(FailType.UNKNOWN, null);
+ expectFailTypeForCode(FailType.UNKNOWN, "Nobody knows me.");
+ expectFailTypeForCode(FailType.QUERY_RUNTIME_FAILURE, QueryException.UNKNOWN_EXCEPTION_ERROR_CODE);
+ expectFailTypeForCode(FailType.USER_ERROR, QueryException.JSON_PARSE_ERROR_CODE);
+ expectFailTypeForCode(FailType.USER_ERROR, QueryException.BAD_QUERY_CONTEXT_ERROR_CODE);
+ expectFailTypeForCode(FailType.CAPACITY_EXCEEDED, QueryException.QUERY_CAPACITY_EXCEEDED_ERROR_CODE);
+ expectFailTypeForCode(FailType.QUERY_RUNTIME_FAILURE, QueryException.QUERY_INTERRUPTED_ERROR_CODE);
+ expectFailTypeForCode(FailType.CANCELED, QueryException.QUERY_CANCELED_ERROR_CODE);
+ expectFailTypeForCode(FailType.UNAUTHORIZED, QueryException.UNAUTHORIZED_ERROR_CODE);
+ expectFailTypeForCode(FailType.QUERY_RUNTIME_FAILURE, QueryException.UNSUPPORTED_OPERATION_ERROR_CODE);
+ expectFailTypeForCode(FailType.QUERY_RUNTIME_FAILURE, QueryException.TRUNCATED_RESPONSE_CONTEXT_ERROR_CODE);
+ expectFailTypeForCode(FailType.TIMEOUT, QueryException.QUERY_TIMEOUT_ERROR_CODE);
+ expectFailTypeForCode(FailType.UNSUPPORTED, QueryException.QUERY_UNSUPPORTED_ERROR_CODE);
+ expectFailTypeForCode(FailType.USER_ERROR, QueryException.RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
+ expectFailTypeForCode(FailType.USER_ERROR, QueryException.SQL_PARSE_FAILED_ERROR_CODE);
+ expectFailTypeForCode(FailType.USER_ERROR, QueryException.PLAN_VALIDATION_FAILED_ERROR_CODE);
+ expectFailTypeForCode(FailType.USER_ERROR, QueryException.SQL_QUERY_UNSUPPORTED_ERROR_CODE);
+ }
+
+ private void expectFailTypeForCode(FailType expected, String code)
+ {
+ QueryException exception = new QueryException(new Exception(), code, "java.lang.Exception", "test");
+
+ Assert.assertEquals(code, expected, exception.getFailType());
}
}
diff --git a/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java b/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java
index ab187a1fb42..acc6b44d58f 100644
--- a/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java
+++ b/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java
@@ -19,7 +19,10 @@
package org.apache.druid.query;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
import org.junit.Assert;
import org.junit.Test;
@@ -30,7 +33,26 @@ public class QueryTimeoutExceptionTest
@Test
public void testSerde() throws IOException
{
- final ObjectMapper mapper = new ObjectMapper();
+ // We re-create the configuration from DefaultObjectMapper here because this is in `core` and
+ // DefaultObjectMapper is in `processing`. Hopefully that distinction disappears at some point
+ // in time, but it exists today and moving things one way or the other quickly turns into just
+ // chunking it all together.
+ final ObjectMapper mapper = new ObjectMapper()
+ {
+ {
+ configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ configure(MapperFeature.AUTO_DETECT_GETTERS, false);
+ // See https://github.com/FasterXML/jackson-databind/issues/170
+ // configure(MapperFeature.AUTO_DETECT_CREATORS, false);
+ configure(MapperFeature.AUTO_DETECT_FIELDS, false);
+ configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
+ configure(MapperFeature.AUTO_DETECT_SETTERS, false);
+ configure(MapperFeature.ALLOW_FINAL_FIELDS_AS_MUTATORS, false);
+ configure(SerializationFeature.INDENT_OUTPUT, false);
+ configure(SerializationFeature.FLUSH_AFTER_WRITE_VALUE, false);
+ }
+ };
+
QueryTimeoutException timeoutException = mapper.readValue(
mapper.writeValueAsBytes(new QueryTimeoutException()),
QueryTimeoutException.class
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITSqlCancelTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITSqlCancelTest.java
index ad8dd3cb11f..d5bc1e30204 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITSqlCancelTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITSqlCancelTest.java
@@ -27,7 +27,6 @@ import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.QueryException;
-import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.SqlResourceTestClient;
@@ -107,10 +106,10 @@ public class ITSqlCancelTest
throw new ISE("Query is not canceled after cancel request");
}
QueryException queryException = jsonMapper.readValue(queryResponse.getContent(), QueryException.class);
- if (!QueryInterruptedException.QUERY_CANCELED.equals(queryException.getErrorCode())) {
+ if (!"Query cancelled".equals(queryException.getErrorCode())) {
throw new ISE(
"Expected error code [%s], actual [%s]",
- QueryInterruptedException.QUERY_CANCELED,
+ "Query cancelled",
queryException.getErrorCode()
);
}
@@ -138,7 +137,11 @@ public class ITSqlCancelTest
final StatusResponseHolder queryResponse = queryResponseFuture.get(30, TimeUnit.SECONDS);
if (!queryResponse.getStatus().equals(HttpResponseStatus.OK)) {
- throw new ISE("Cancel request failed with status[%s] and content[%s]", queryResponse.getStatus(), queryResponse.getContent());
+ throw new ISE(
+ "Cancel request failed with status[%s] and content[%s]",
+ queryResponse.getStatus(),
+ queryResponse.getContent()
+ );
}
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/BadJsonQueryException.java b/processing/src/main/java/org/apache/druid/query/BadJsonQueryException.java
index 8be18edf18a..47ba3c90777 100644
--- a/processing/src/main/java/org/apache/druid/query/BadJsonQueryException.java
+++ b/processing/src/main/java/org/apache/druid/query/BadJsonQueryException.java
@@ -25,12 +25,11 @@ import com.fasterxml.jackson.core.JsonParseException;
public class BadJsonQueryException extends BadQueryException
{
- public static final String ERROR_CODE = "Json parse failed";
public static final String ERROR_CLASS = JsonParseException.class.getName();
public BadJsonQueryException(JsonParseException e)
{
- this(ERROR_CODE, e.getMessage(), ERROR_CLASS);
+ this(JSON_PARSE_ERROR_CODE, e.getMessage(), ERROR_CLASS);
}
@JsonCreator
diff --git a/processing/src/main/java/org/apache/druid/query/BadQueryContextException.java b/processing/src/main/java/org/apache/druid/query/BadQueryContextException.java
index 29f63b1f40e..cbfb0ca410c 100644
--- a/processing/src/main/java/org/apache/druid/query/BadQueryContextException.java
+++ b/processing/src/main/java/org/apache/druid/query/BadQueryContextException.java
@@ -24,17 +24,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class BadQueryContextException extends BadQueryException
{
- public static final String ERROR_CODE = "Query context parse failed";
public static final String ERROR_CLASS = BadQueryContextException.class.getName();
- public BadQueryContextException(Exception e)
- {
- this(ERROR_CODE, e.getMessage(), ERROR_CLASS);
- }
-
public BadQueryContextException(String msg)
{
- this(ERROR_CODE, msg, ERROR_CLASS);
+ this(BAD_QUERY_CONTEXT_ERROR_CODE, msg, ERROR_CLASS);
}
@JsonCreator
diff --git a/processing/src/main/java/org/apache/druid/query/QueryCapacityExceededException.java b/processing/src/main/java/org/apache/druid/query/QueryCapacityExceededException.java
index f62eb9166d8..694fbb780cb 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryCapacityExceededException.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryCapacityExceededException.java
@@ -32,7 +32,7 @@ import org.apache.druid.java.util.common.StringUtils;
*
When the query is rejected by QueryScheduler.
*
When the query cannot acquire enough merge buffers for groupBy v2
*
- *
+ *
* As a {@link QueryException} it is expected to be serialied to a json response, but will be mapped to
* {@link #STATUS_CODE} instead of the default HTTP 500 status.
*/
@@ -43,17 +43,16 @@ public class QueryCapacityExceededException extends QueryException
private static final String LANE_ERROR_MESSAGE_TEMPLATE =
"Too many concurrent queries for lane '%s', query capacity of %s exceeded. Please try your query again later.";
private static final String ERROR_CLASS = QueryCapacityExceededException.class.getName();
- public static final String ERROR_CODE = "Query capacity exceeded";
public static final int STATUS_CODE = 429;
public QueryCapacityExceededException(int capacity)
{
- super(ERROR_CODE, makeTotalErrorMessage(capacity), ERROR_CLASS, null);
+ super(QUERY_CAPACITY_EXCEEDED_ERROR_CODE, makeTotalErrorMessage(capacity), ERROR_CLASS, null);
}
public QueryCapacityExceededException(String lane, int capacity)
{
- super(ERROR_CODE, makeLaneErrorMessage(lane, capacity), ERROR_CLASS, null);
+ super(QUERY_CAPACITY_EXCEEDED_ERROR_CODE, makeLaneErrorMessage(lane, capacity), ERROR_CLASS, null);
}
/**
@@ -62,7 +61,12 @@ public class QueryCapacityExceededException extends QueryException
*/
public static QueryCapacityExceededException withErrorMessageAndResolvedHost(String errorMessage)
{
- return new QueryCapacityExceededException(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
+ return new QueryCapacityExceededException(
+ QUERY_CAPACITY_EXCEEDED_ERROR_CODE,
+ errorMessage,
+ ERROR_CLASS,
+ resolveHostname()
+ );
}
@JsonCreator
diff --git a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java
index ae67039242f..91760aa7c10 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java
@@ -29,7 +29,7 @@ import java.util.concurrent.CancellationException;
/**
* Exception representing a failed query. The name "QueryInterruptedException" is a misnomer; this is actually
* used on the client side for *all* kinds of failed queries.
- *
+ *
* Fields:
* - "errorCode" is a well-defined errorCode code taken from a specific list (see the static constants). "Unknown exception"
* represents all wrapped exceptions other than interrupt, cancellation, resource limit exceeded, unauthorized
@@ -37,21 +37,12 @@ import java.util.concurrent.CancellationException;
* - "errorMessage" is the toString of the wrapped exception
* - "errorClass" is the class of the wrapped exception
* - "host" is the host that the errorCode occurred on
- *
+ *
* The QueryResource is expected to emit the JSON form of this object when errors happen, and the DirectDruidClient
* deserializes and wraps them.
*/
public class QueryInterruptedException extends QueryException
{
- public static final String QUERY_INTERRUPTED = "Query interrupted";
- // Note: the proper spelling is with a single "l", but the version with
- // two "l"s is documented, we can't change the text of the message.
- public static final String QUERY_CANCELED = "Query cancelled";
- public static final String UNAUTHORIZED = "Unauthorized request";
- public static final String UNSUPPORTED_OPERATION = "Unsupported operation";
- public static final String TRUNCATED_RESPONSE_CONTEXT = "Truncated response context";
- public static final String UNKNOWN_EXCEPTION = "Unknown exception";
-
@JsonCreator
public QueryInterruptedException(
@JsonProperty("error") @Nullable String errorCode,
@@ -96,15 +87,15 @@ public class QueryInterruptedException extends QueryException
if (e instanceof QueryInterruptedException) {
return ((QueryInterruptedException) e).getErrorCode();
} else if (e instanceof InterruptedException) {
- return QUERY_INTERRUPTED;
+ return QUERY_INTERRUPTED_ERROR_CODE;
} else if (e instanceof CancellationException) {
- return QUERY_CANCELED;
+ return QUERY_CANCELED_ERROR_CODE;
} else if (e instanceof UnsupportedOperationException) {
- return UNSUPPORTED_OPERATION;
+ return UNSUPPORTED_OPERATION_ERROR_CODE;
} else if (e instanceof TruncatedResponseContextException) {
- return TRUNCATED_RESPONSE_CONTEXT;
+ return TRUNCATED_RESPONSE_CONTEXT_ERROR_CODE;
} else {
- return UNKNOWN_EXCEPTION;
+ return UNKNOWN_EXCEPTION_ERROR_CODE;
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/QueryUnsupportedException.java b/processing/src/main/java/org/apache/druid/query/QueryUnsupportedException.java
index bde1f9d14e1..81d82a94871 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryUnsupportedException.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryUnsupportedException.java
@@ -29,14 +29,13 @@ import javax.annotation.Nullable;
* This exception is for the query engine to surface when a query cannot be run. This can be due to the
* following reasons: 1) The query is not supported yet. 2) The query is not something Druid would ever supports.
* For these cases, the exact causes and details should also be documented in Druid user facing documents.
- *
+ *
* As a {@link QueryException} it is expected to be serialized to a json response with a proper HTTP error code
* ({@link #STATUS_CODE}).
*/
public class QueryUnsupportedException extends QueryException
{
private static final String ERROR_CLASS = QueryUnsupportedException.class.getName();
- public static final String ERROR_CODE = "Unsupported query";
public static final int STATUS_CODE = 501;
@JsonCreator
@@ -52,6 +51,6 @@ public class QueryUnsupportedException extends QueryException
public QueryUnsupportedException(String errorMessage)
{
- super(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
+ super(QUERY_UNSUPPORTED_ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/ResourceLimitExceededException.java b/processing/src/main/java/org/apache/druid/query/ResourceLimitExceededException.java
index 169d774cabf..a41699cf442 100644
--- a/processing/src/main/java/org/apache/druid/query/ResourceLimitExceededException.java
+++ b/processing/src/main/java/org/apache/druid/query/ResourceLimitExceededException.java
@@ -25,7 +25,7 @@ import org.apache.druid.java.util.common.StringUtils;
/**
* Exception indicating that an operation failed because it exceeded some configured resource limit.
- *
+ *
* This is a {@link BadQueryException} because it likely indicates a user's misbehavior when this exception is thrown.
* The resource limitations set by Druid cluster operators are typically less flexible than the parameters of
* a user query, so when a user query requires too many resources, the likely remedy is that the user query
@@ -33,8 +33,6 @@ import org.apache.druid.java.util.common.StringUtils;
*/
public class ResourceLimitExceededException extends BadQueryException
{
- public static final String ERROR_CODE = "Resource limit exceeded";
-
public static ResourceLimitExceededException withMessage(String message, Object... arguments)
{
return new ResourceLimitExceededException(StringUtils.nonStrictFormat(message, arguments));
@@ -47,7 +45,7 @@ public class ResourceLimitExceededException extends BadQueryException
public ResourceLimitExceededException(String message)
{
- this(ERROR_CODE, message, ResourceLimitExceededException.class.getName());
+ this(RESOURCE_LIMIT_EXCEEDED_ERROR_CODE, message, ResourceLimitExceededException.class.getName());
}
@JsonCreator
diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
index a943297d173..6727782cc40 100644
--- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
+++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
@@ -80,18 +80,18 @@ import java.util.stream.Collectors;
*
Manages headers size by dropping fields when the header would get too
* large.
*
- *
+ *
* A result is that the information the context, when inspected by a calling
* query, may be incomplete if some of it was previously dropped by the
* called query.
*
*
API
- *
+ *
* The query profile needs to obtain the full, untruncated information. To do this
* it piggy-backs on the set operations to obtain the full value. To ensure this
* is possible, code that works with standard values should call the set (or add)
* functions provided which will do the needed map update.
- */
+ */
@PublicApi
public abstract class ResponseContext
{
@@ -118,7 +118,7 @@ public abstract class ResponseContext
/**
* Merges two values of type T.
- *
+ *
* This method may modify "oldValue" but must not modify "newValue".
*/
Object mergeValues(Object oldValue, Object newValue);
@@ -317,7 +317,8 @@ public abstract class ResponseContext
true, true,
new TypeReference>()
{
- })
+ }
+ )
{
@Override
@SuppressWarnings("unchecked")
@@ -334,14 +335,15 @@ public abstract class ResponseContext
*/
public static final Key UNCOVERED_INTERVALS_OVERFLOWED = new BooleanKey(
"uncoveredIntervalsOverflowed",
- true);
+ true
+ );
/**
* Map of most relevant query ID to remaining number of responses from query nodes.
* The value is initialized in {@code CachingClusteredClient} when it initializes the connection to the query nodes,
* and is updated whenever they respond (@code DirectDruidClient). {@code RetryQueryRunner} uses this value to
* check if the {@link #MISSING_SEGMENTS} is valid.
- *
+ *
* Currently, the broker doesn't run subqueries in parallel, the remaining number of responses will be updated
* one by one per subquery. However, since it can be parallelized to run subqueries simultaneously, we store them
* in a ConcurrentHashMap.
@@ -351,7 +353,8 @@ public abstract class ResponseContext
public static final Key REMAINING_RESPONSES_FROM_QUERY_SERVERS = new AbstractKey(
"remainingResponsesFromQueryServers",
false, true,
- Object.class)
+ Object.class
+ )
{
@Override
@SuppressWarnings("unchecked")
@@ -361,7 +364,8 @@ public abstract class ResponseContext
final NonnullPair pair = (NonnullPair) idAndNumResponses;
map.compute(
pair.lhs,
- (id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs);
+ (id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs
+ );
return map;
}
};
@@ -372,7 +376,10 @@ public abstract class ResponseContext
public static final Key MISSING_SEGMENTS = new AbstractKey(
"missingSegments",
true, true,
- new TypeReference>() {})
+ new TypeReference>()
+ {
+ }
+ )
{
@Override
@SuppressWarnings("unchecked")
@@ -396,7 +403,10 @@ public abstract class ResponseContext
public static final Key QUERY_TOTAL_BYTES_GATHERED = new AbstractKey(
"queryTotalBytesGathered",
false, false,
- new TypeReference() {})
+ new TypeReference()
+ {
+ }
+ )
{
@Override
public Object mergeValues(Object oldValue, Object newValue)
@@ -410,7 +420,8 @@ public abstract class ResponseContext
*/
public static final Key QUERY_FAIL_DEADLINE_MILLIS = new LongKey(
"queryFailTime",
- false);
+ false
+ );
/**
* This variable indicates when a running query should be expired,
@@ -418,17 +429,19 @@ public abstract class ResponseContext
*/
public static final Key TIMEOUT_AT = new LongKey(
"timeoutAt",
- false);
+ false
+ );
/**
* The number of rows scanned by {@link org.apache.druid.query.scan.ScanQueryEngine}.
- *
+ *
* Named "count" for backwards compatibility with older data servers that still send this, even though it's now
* marked as internal.
*/
public static final Key NUM_SCANNED_ROWS = new CounterKey(
"count",
- false);
+ false
+ );
/**
* The total CPU time for threads related to Sequence processing of the query.
@@ -437,14 +450,16 @@ public abstract class ResponseContext
*/
public static final Key CPU_CONSUMED_NANOS = new CounterKey(
"cpuConsumed",
- false);
+ false
+ );
/**
* Indicates if a {@link ResponseContext} was truncated during serialization.
*/
public static final Key TRUNCATED = new BooleanKey(
"truncated",
- false);
+ false
+ );
/**
* One and only global list of keys. This is a semi-constant: it is mutable
@@ -461,20 +476,21 @@ public abstract class ResponseContext
private final ConcurrentMap registeredKeys = new ConcurrentSkipListMap<>();
static {
- instance().registerKeys(new Key[]
- {
- UNCOVERED_INTERVALS,
- UNCOVERED_INTERVALS_OVERFLOWED,
- REMAINING_RESPONSES_FROM_QUERY_SERVERS,
- MISSING_SEGMENTS,
- ETAG,
- QUERY_TOTAL_BYTES_GATHERED,
- QUERY_FAIL_DEADLINE_MILLIS,
- TIMEOUT_AT,
- NUM_SCANNED_ROWS,
- CPU_CONSUMED_NANOS,
- TRUNCATED,
- });
+ instance().registerKeys(
+ new Key[]{
+ UNCOVERED_INTERVALS,
+ UNCOVERED_INTERVALS_OVERFLOWED,
+ REMAINING_RESPONSES_FROM_QUERY_SERVERS,
+ MISSING_SEGMENTS,
+ ETAG,
+ QUERY_TOTAL_BYTES_GATHERED,
+ QUERY_FAIL_DEADLINE_MILLIS,
+ TIMEOUT_AT,
+ NUM_SCANNED_ROWS,
+ CPU_CONSUMED_NANOS,
+ TRUNCATED,
+ }
+ );
}
/**
@@ -701,8 +717,10 @@ public abstract class ResponseContext
public void addRemainingResponse(String id, int count)
{
- addValue(Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS,
- new NonnullPair<>(id, count));
+ addValue(
+ Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS,
+ new NonnullPair<>(id, count)
+ );
}
public void addMissingSegments(List descriptors)
@@ -820,7 +838,6 @@ public abstract class ResponseContext
*
* @param node {@link ArrayNode} which elements are being removed.
* @param target the number of chars need to be removed.
- *
* @return the number of removed chars.
*/
private static int removeNodeElementsToSatisfyCharsLimit(ArrayNode node, int target)
@@ -851,7 +868,7 @@ public abstract class ResponseContext
private final String truncatedResult;
private final String fullResult;
- SerializationResult(@Nullable String truncatedResult, String fullResult)
+ public SerializationResult(@Nullable String truncatedResult, String fullResult)
{
this.truncatedResult = truncatedResult;
this.fullResult = fullResult;
diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
index 97c772ed192..e14e24c2231 100644
--- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
+++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
@@ -168,7 +168,11 @@ public class JsonParserIterator implements Iterator, Closeable
} else if (checkTimeout()) {
throw timeoutQuery();
} else {
- // TODO: NettyHttpClient should check the actual cause of the failure and set it in the future properly.
+ // The InputStream is null and we have not timed out, there might be multiple reasons why we could hit this
+ // condition, guess that we are hitting it because of scatter-gather bytes. It would be better to be more
+ // explicit about why errors are happening than guessing, but this comment is being rewritten from a T-O-D-O,
+ // so the intent is just to document this better rather than do all of the logic to fix it. If/when we get
+ // this exception thrown for other reasons, it would be great to document what other reasons this can happen.
throw ResourceLimitExceededException.withMessage(
"Possibly max scatter-gather bytes limit reached while reading from url[%s].",
url
@@ -207,11 +211,11 @@ public class JsonParserIterator implements Iterator, Closeable
/**
* Converts the given exception to a proper type of {@link QueryException}.
* The use cases of this method are:
- *
+ *
* - All non-QueryExceptions are wrapped with {@link QueryInterruptedException}.
* - The QueryException from {@link DirectDruidClient} is converted to a more specific type of QueryException
- * based on {@link QueryException#getErrorCode()}. During conversion, {@link QueryException#host} is overridden
- * by {@link #host}.
+ * based on {@link QueryException#getErrorCode()}. During conversion, {@link QueryException#host} is overridden
+ * by {@link #host}.
*/
private QueryException convertException(Throwable cause)
{
@@ -219,9 +223,9 @@ public class JsonParserIterator implements Iterator, Closeable
if (cause instanceof QueryException) {
final QueryException queryException = (QueryException) cause;
if (queryException.getErrorCode() == null) {
- // errorCode should not be null now, but maybe could be null in the past..
+ // errorCode should not be null now, but maybe could be null in the past...
return new QueryInterruptedException(
- queryException.getErrorCode(),
+ QueryException.UNKNOWN_EXCEPTION_ERROR_CODE,
queryException.getMessage(),
queryException.getErrorClass(),
host
@@ -229,32 +233,37 @@ public class JsonParserIterator implements Iterator, Closeable
}
// Note: this switch clause is to restore the 'type' information of QueryExceptions which is lost during
- // JSON serialization. This is not a good way to restore the correct exception type. Rather, QueryException
- // should store its type when it is serialized, so that we can know the exact type when it is deserialized.
+ // JSON serialization. As documented on the QueryException class, the errorCode of QueryException is the only
+ // way to differentiate the cause of the exception. This code does not cover all possible exceptions that
+ // could come up and so, likely, doesn't produce exceptions reliably. The only safe way to catch and interact
+ // with a QueryException is to catch QueryException and check its errorCode. In some future code change, we
+ // should likely remove this switch entirely, but when we do that, we need to make sure to also adjust any
+ // points in the code that are catching the specific child Exceptions to instead catch QueryException and
+ // check the errorCode.
switch (queryException.getErrorCode()) {
// The below is the list of exceptions that can be thrown in historicals and propagated to the broker.
- case QueryTimeoutException.ERROR_CODE:
+ case QueryException.QUERY_TIMEOUT_ERROR_CODE:
return new QueryTimeoutException(
queryException.getErrorCode(),
queryException.getMessage(),
queryException.getErrorClass(),
host
);
- case QueryCapacityExceededException.ERROR_CODE:
+ case QueryException.QUERY_CAPACITY_EXCEEDED_ERROR_CODE:
return new QueryCapacityExceededException(
queryException.getErrorCode(),
queryException.getMessage(),
queryException.getErrorClass(),
host
);
- case QueryUnsupportedException.ERROR_CODE:
+ case QueryException.QUERY_UNSUPPORTED_ERROR_CODE:
return new QueryUnsupportedException(
queryException.getErrorCode(),
queryException.getMessage(),
queryException.getErrorClass(),
host
);
- case ResourceLimitExceededException.ERROR_CODE:
+ case QueryException.RESOURCE_LIMIT_EXCEEDED_ERROR_CODE:
return new ResourceLimitExceededException(
queryException.getErrorCode(),
queryException.getMessage(),
diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java
index 743ca9e60ba..84e6acf24c1 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResource.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResource.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.SequenceWriter;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.joda.ser.DateTimeSerializer;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
@@ -30,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
-import com.google.common.io.CountingOutputStream;
import com.google.inject.Inject;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.guice.LazySingleton;
@@ -38,20 +38,13 @@ import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.BadJsonQueryException;
-import org.apache.druid.query.BadQueryException;
import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryException;
import org.apache.druid.query.QueryInterruptedException;
-import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.TruncatedResponseContextException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContext.Keys;
@@ -64,7 +57,9 @@ import org.apache.druid.server.security.ForbiddenException;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
+import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.POST;
@@ -72,12 +67,10 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -90,6 +83,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class QueryResource implements QueryCountStatsProvider
{
protected static final EmittingLogger log = new EmittingLogger(QueryResource.class);
+ public static final EmittingLogger NO_STACK_LOGGER = log.noStackTrace();
+
@Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE
protected static final String APPLICATION_SMILE = "application/smile";
@@ -116,6 +111,7 @@ public class QueryResource implements QueryCountStatsProvider
private final AtomicLong failedQueryCount = new AtomicLong();
private final AtomicLong interruptedQueryCount = new AtomicLong();
private final AtomicLong timedOutQueryCount = new AtomicLong();
+ private final QueryResourceQueryMetricCounter counter = new QueryResourceQueryMetricCounter();
@Inject
public QueryResource(
@@ -171,23 +167,28 @@ public class QueryResource implements QueryCountStatsProvider
@POST
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE})
+ @Nullable
public Response doPost(
final InputStream in,
@QueryParam("pretty") final String pretty,
-
- // used to get request content-type,Accept header, remote address and auth-related headers
@Context final HttpServletRequest req
) throws IOException
{
final QueryLifecycle queryLifecycle = queryLifecycleFactory.factorize();
- final ResourceIOReaderWriter ioReaderWriter = createResourceIOReaderWriter(req, pretty != null);
+ final ResourceIOReaderWriter io = createResourceIOReaderWriter(req, pretty != null);
final String currThreadName = Thread.currentThread().getName();
try {
- final Query> query = readQuery(req, in, ioReaderWriter);
+ final Query> query;
+ try {
+ query = readQuery(req, in, io);
+ }
+ catch (QueryException e) {
+ return io.getResponseWriter().buildNonOkResponse(e.getFailType().getExpectedStatus(), e);
+ }
+
queryLifecycle.initialize(query);
- final String queryId = queryLifecycle.getQueryId();
final String queryThreadName = queryLifecycle.threadName(currThreadName);
Thread.currentThread().setName(queryThreadName);
@@ -195,137 +196,88 @@ public class QueryResource implements QueryCountStatsProvider
log.debug("Got query [%s]", queryLifecycle.getQuery());
}
- final Access authResult = queryLifecycle.authorize(req);
+ final Access authResult;
+ try {
+ authResult = queryLifecycle.authorize(req);
+ }
+ catch (RuntimeException e) {
+ final QueryException qe;
+
+ if (e instanceof QueryException) {
+ qe = (QueryException) e;
+ } else {
+ qe = new QueryInterruptedException(e);
+ }
+
+ return io.getResponseWriter().buildNonOkResponse(qe.getFailType().getExpectedStatus(), qe);
+ }
+
if (!authResult.isAllowed()) {
throw new ForbiddenException(authResult.toString());
}
- final QueryResponse> queryResponse = queryLifecycle.execute();
- final Sequence> results = queryResponse.getResults();
- final ResponseContext responseContext = queryResponse.getResponseContext();
- final String prevEtag = getPreviousEtag(req);
-
- if (prevEtag != null && prevEtag.equals(responseContext.getEntityTag())) {
- queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), -1);
- successfulQueryCount.incrementAndGet();
- return Response.notModified().build();
- }
-
- final Yielder> yielder = Yielders.each(results);
+ // We use an async context not because we are actually going to run this async, but because we want to delay
+ // the decision of what the response code should be until we have gotten the first few data points to return.
+ // Returning a Response object from this point forward requires that object to know the status code, which we
+ // don't actually know until we are in the accumulator, but if we try to return a Response object from the
+ // accumulator, we cannot properly stream results back, because the accumulator won't release control of the
+ // Response until it has consumed the underlying Sequence.
+ final AsyncContext asyncContext = req.startAsync();
try {
- final ObjectWriter jsonWriter = queryLifecycle.newOutputWriter(ioReaderWriter);
-
- Response.ResponseBuilder responseBuilder = Response
- .ok(
- new StreamingOutput()
- {
- @Override
- public void write(OutputStream outputStream) throws WebApplicationException
- {
- Exception e = null;
-
- CountingOutputStream os = new CountingOutputStream(outputStream);
- try {
- // json serializer will always close the yielder
- jsonWriter.writeValue(os, yielder);
-
- os.flush(); // Some types of OutputStream suppress flush errors in the .close() method.
- os.close();
- }
- catch (Exception ex) {
- e = ex;
- log.noStackTrace().error(ex, "Unable to send query response.");
- throw new RuntimeException(ex);
- }
- finally {
- Thread.currentThread().setName(currThreadName);
-
- queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), os.getCount());
-
- if (e == null) {
- successfulQueryCount.incrementAndGet();
- } else {
- failedQueryCount.incrementAndGet();
- }
- }
- }
- },
- ioReaderWriter.getResponseWriter().getResponseType()
- )
- .header(QUERY_ID_RESPONSE_HEADER, queryId);
-
- attachResponseContextToHttpResponse(queryId, responseContext, responseBuilder, jsonMapper,
- responseContextConfig, selfNode
- );
-
- return responseBuilder.build();
- }
- catch (QueryException e) {
- // make sure to close yielder if anything happened before starting to serialize the response.
- yielder.close();
- throw e;
- }
- catch (Exception e) {
- // make sure to close yielder if anything happened before starting to serialize the response.
- yielder.close();
- throw new RuntimeException(e);
+ new QueryResourceQueryResultPusher(req, queryLifecycle, io, (HttpServletResponse) asyncContext.getResponse())
+ .push();
}
finally {
- // do not close yielder here, since we do not want to close the yielder prior to
- // StreamingOutput having iterated over all the results
+ asyncContext.complete();
}
}
- catch (QueryInterruptedException e) {
- interruptedQueryCount.incrementAndGet();
- queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1);
- return ioReaderWriter.getResponseWriter().gotError(e);
- }
- catch (QueryTimeoutException timeout) {
- timedOutQueryCount.incrementAndGet();
- queryLifecycle.emitLogsAndMetrics(timeout, req.getRemoteAddr(), -1);
- return ioReaderWriter.getResponseWriter().gotTimeout(timeout);
- }
- catch (QueryCapacityExceededException cap) {
- failedQueryCount.incrementAndGet();
- queryLifecycle.emitLogsAndMetrics(cap, req.getRemoteAddr(), -1);
- return ioReaderWriter.getResponseWriter().gotLimited(cap);
- }
- catch (QueryUnsupportedException unsupported) {
- failedQueryCount.incrementAndGet();
- queryLifecycle.emitLogsAndMetrics(unsupported, req.getRemoteAddr(), -1);
- return ioReaderWriter.getResponseWriter().gotUnsupported(unsupported);
- }
- catch (BadQueryException e) {
- interruptedQueryCount.incrementAndGet();
- queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1);
- return ioReaderWriter.getResponseWriter().gotBadQuery(e);
- }
- catch (ForbiddenException e) {
- // don't do anything for an authorization failure, ForbiddenExceptionMapper will catch this later and
- // send an error response if this is thrown.
- throw e;
- }
catch (Exception e) {
- failedQueryCount.incrementAndGet();
- queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1);
+ if (e instanceof ForbiddenException && !req.isAsyncStarted()) {
+ // We can only pass through the Forbidden exception if we haven't started async yet.
+ throw e;
+ }
+ log.warn(e, "Uncaught exception from query processing. This should be caught and handled directly.");
- log.noStackTrace()
- .makeAlert(e, "Exception handling request")
- .addData(
- "query",
- queryLifecycle.getQuery() != null
- ? jsonMapper.writeValueAsString(queryLifecycle.getQuery())
- : "unparseable query"
- )
- .addData("peer", req.getRemoteAddr())
- .emit();
-
- return ioReaderWriter.getResponseWriter().gotError(e);
+ // Just fall back to the async context.
+ AsyncContext asyncContext = req.startAsync();
+ try {
+ final HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
+ // If the response is committed, we actually processed and started doing things with the request,
+ // so the best we can do is just complete in the finally and hope for the best.
+ if (!response.isCommitted()) {
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ response.setContentType(MediaType.APPLICATION_JSON);
+ try (OutputStream out = response.getOutputStream()) {
+ final QueryException responseException = new QueryException(
+ QueryException.UNKNOWN_EXCEPTION_ERROR_CODE,
+ "Unhandled exception made it to the top",
+ e.getClass().getName(),
+ req.getRemoteHost()
+ );
+ out.write(jsonMapper.writeValueAsBytes(responseException));
+ }
+ }
+ }
+ finally {
+ asyncContext.complete();
+ }
}
finally {
Thread.currentThread().setName(currThreadName);
}
+ return null;
+ }
+
+ public interface QueryMetricCounter
+ {
+ void incrementSuccess();
+
+ void incrementFailed();
+
+ void incrementInterrupted();
+
+ void incrementTimedOut();
}
public static void attachResponseContextToHttpResponse(
@@ -416,16 +368,20 @@ public class QueryResource implements QueryCountStatsProvider
// response type defaults to Content-Type if 'Accept' header not provided
String responseType = Strings.isNullOrEmpty(acceptHeader) ? requestType : acceptHeader;
- boolean isRequestSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(requestType) || APPLICATION_SMILE.equals(requestType);
- boolean isResponseSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(responseType) || APPLICATION_SMILE.equals(responseType);
+ boolean isRequestSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(requestType) || APPLICATION_SMILE.equals(
+ requestType);
+ boolean isResponseSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(responseType)
+ || APPLICATION_SMILE.equals(responseType);
return new ResourceIOReaderWriter(
isRequestSmile ? smileMapper : jsonMapper,
- new ResourceIOWriter(isResponseSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON,
- isResponseSmile ? smileMapper : jsonMapper,
- isResponseSmile ? serializeDateTimeAsLongSmileMapper : serializeDateTimeAsLongJsonMapper,
- pretty
- ));
+ new ResourceIOWriter(
+ isResponseSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON,
+ isResponseSmile ? smileMapper : jsonMapper,
+ isResponseSmile ? serializeDateTimeAsLongSmileMapper : serializeDateTimeAsLongJsonMapper,
+ pretty
+ )
+ );
}
protected static class ResourceIOReaderWriter
@@ -504,26 +460,6 @@ public class QueryResource implements QueryCountStatsProvider
);
}
- Response gotTimeout(QueryTimeoutException e) throws IOException
- {
- return buildNonOkResponse(QueryTimeoutException.STATUS_CODE, e);
- }
-
- Response gotLimited(QueryCapacityExceededException e) throws IOException
- {
- return buildNonOkResponse(QueryCapacityExceededException.STATUS_CODE, e);
- }
-
- Response gotUnsupported(QueryUnsupportedException e) throws IOException
- {
- return buildNonOkResponse(QueryUnsupportedException.STATUS_CODE, e);
- }
-
- Response gotBadQuery(BadQueryException e) throws IOException
- {
- return buildNonOkResponse(BadQueryException.STATUS_CODE, e);
- }
-
Response buildNonOkResponse(int status, Exception e) throws JsonProcessingException
{
return Response.status(status)
@@ -565,4 +501,142 @@ public class QueryResource implements QueryCountStatsProvider
builder.header(HEADER_ETAG, entityTag);
}
}
+
+ private class QueryResourceQueryMetricCounter implements QueryMetricCounter
+ {
+ @Override
+ public void incrementSuccess()
+ {
+ successfulQueryCount.incrementAndGet();
+ }
+
+ @Override
+ public void incrementFailed()
+ {
+ failedQueryCount.incrementAndGet();
+ }
+
+ @Override
+ public void incrementInterrupted()
+ {
+ interruptedQueryCount.incrementAndGet();
+ }
+
+ @Override
+ public void incrementTimedOut()
+ {
+ timedOutQueryCount.incrementAndGet();
+ }
+ }
+
+ private class QueryResourceQueryResultPusher extends QueryResultPusher
+ {
+ private final HttpServletRequest req;
+ private final QueryLifecycle queryLifecycle;
+ private final ResourceIOReaderWriter io;
+
+ public QueryResourceQueryResultPusher(
+ HttpServletRequest req,
+ QueryLifecycle queryLifecycle,
+ ResourceIOReaderWriter io,
+ HttpServletResponse response
+ )
+ {
+ super(
+ response,
+ QueryResource.this.jsonMapper,
+ QueryResource.this.responseContextConfig,
+ QueryResource.this.selfNode,
+ QueryResource.this.counter,
+ queryLifecycle.getQueryId(),
+ MediaType.valueOf(io.getResponseWriter().getResponseType())
+ );
+ this.req = req;
+ this.queryLifecycle = queryLifecycle;
+ this.io = io;
+ }
+
+ @Override
+ public ResultsWriter start()
+ {
+ return new ResultsWriter()
+ {
+ @Override
+ public QueryResponse