From 0efd0879a83191ac550cded6122451ba4bf91194 Mon Sep 17 00:00:00 2001
From: imply-cheddar <86940447+imply-cheddar@users.noreply.github.com>
Date: Mon, 19 Dec 2022 17:25:33 +0900
Subject: [PATCH] Unify the handling of HTTP between SQL and Native (#13564)
* Unify the handling of HTTP between SQL and Native
The SqlResource and QueryResource have been
using independent logic for things like error
handling and response context stuff. This
became abundantly clear and painful during a
change I was making for Window Functions, so
I unified them into using the same code for
walking the response and serializing it.
Things are still not perfectly unified (it would
be the absolute best if the SqlResource just
took SQL, planned it and then delegated the
query run entirely to the QueryResource), but
this refactor doesn't take that fully on.
The new code leverages async query processing
from our jetty container, the different
interaction model with the Resource means that
a lot of tests had to be adjusted to align with
the async query model. The semantics of the
tests remain the same with one exception: the
SqlResource used to not log requests that failed
authorization checks, now it does.
---
.../apache/druid/query/QueryException.java | 122 +++-
.../druid/query/QueryTimeoutException.java | 7 +-
.../druid/query/QueryExceptionTest.java | 67 +-
.../query/QueryTimeoutExceptionTest.java | 24 +-
.../druid/tests/query/ITSqlCancelTest.java | 11 +-
.../druid/query/BadJsonQueryException.java | 3 +-
.../druid/query/BadQueryContextException.java | 8 +-
.../query/QueryCapacityExceededException.java | 14 +-
.../query/QueryInterruptedException.java | 23 +-
.../query/QueryUnsupportedException.java | 5 +-
.../query/ResourceLimitExceededException.java | 6 +-
.../druid/query/context/ResponseContext.java | 87 ++-
.../druid/client/JsonParserIterator.java | 33 +-
.../apache/druid/server/QueryResource.java | 388 ++++++-----
.../druid/server/QueryResultPusher.java | 418 ++++++++++++
.../PreResponseAuthorizationCheckFilter.java | 3 +-
.../druid/client/JsonParserIteratorTest.java | 2 +-
.../druid/server/QueryResourceTest.java | 606 +++++++-----------
.../server/coordinator/LoadQueuePeonTest.java | 6 +-
.../server/mocks/ExceptionalInputStream.java | 51 ++
.../druid/server/mocks/MockAsyncContext.java | 130 ++++
.../server/mocks/MockHttpServletRequest.java | 504 +++++++++++++++
.../server/mocks/MockHttpServletResponse.java | 316 +++++++++
.../AsyncQueryForwardingServletTest.java | 36 +-
.../org/apache/druid/sql/DirectStatement.java | 3 +-
.../druid/sql/SqlPlanningException.java | 7 +-
.../apache/druid/sql/http/SqlResource.java | 362 ++++++-----
.../druid/sql/http/SqlResourceTest.java | 420 ++++++------
28 files changed, 2622 insertions(+), 1040 deletions(-)
create mode 100644 server/src/main/java/org/apache/druid/server/QueryResultPusher.java
create mode 100644 server/src/test/java/org/apache/druid/server/mocks/ExceptionalInputStream.java
create mode 100644 server/src/test/java/org/apache/druid/server/mocks/MockAsyncContext.java
create mode 100644 server/src/test/java/org/apache/druid/server/mocks/MockHttpServletRequest.java
create mode 100644 server/src/test/java/org/apache/druid/server/mocks/MockHttpServletResponse.java
diff --git a/core/src/main/java/org/apache/druid/query/QueryException.java b/core/src/main/java/org/apache/druid/query/QueryException.java
index 10267b52539..93f17b6cff9 100644
--- a/core/src/main/java/org/apache/druid/query/QueryException.java
+++ b/core/src/main/java/org/apache/druid/query/QueryException.java
@@ -21,7 +21,6 @@ package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.common.exception.SanitizableException;
import javax.annotation.Nullable;
@@ -30,12 +29,123 @@ import java.net.InetAddress;
import java.util.function.Function;
/**
- * Base serializable error response
- *
+ * Base serializable error response.
+ *
+ * The Object Model that QueryException follows is a little non-intuitive as the primary way that a QueryException is
+ * generated is through a child class. However, those child classes are *not* equivalent to a QueryException, instead
+ * they act as a Factory of QueryException objects. This can be seen in two different places.
+ *
+ * 1. When sanitize() is called, the response is a QueryException without any indication of which original exception
+ * occurred.
+ * 2. When these objects get serialized across the wire the recipient deserializes a QueryException. The client is
+ * never expected, and fundamentally is not allowed to, ever deserialize a child class of QueryException.
+ *
+ * For this reason, QueryException must contain all potential state that any of its child classes could ever want to
+ * push across the wire. Additionally, any catch clauses expecting one of the child Exceptions must know that it is
+ * running inside of code where the exception has not traveled across the wire. If there is a chance that the
+ * exception could have been serialized across the wire, the code must catch a QueryException and check the errorCode
+ * instead.
+ *
+ * As a corollary, adding new state or adjusting the logic of this class must always be done in a backwards-compatible
+ * fashion across all child classes of QueryException.
+ *
+ * If there is any need to do different logic based on the type of error that has happened, the only reliable method
+ * of discerning the type of the error is to look at the errorCode String. Because these Strings are considered part
+ * of the API, they are not allowed to change and must maintain their same semantics. The known errorCode Strings
+ * are pulled together as public static fields on this class in order to make it more clear what options exist.
+ *
* QueryResource and SqlResource are expected to emit the JSON form of this object when errors happen.
*/
public class QueryException extends RuntimeException implements SanitizableException
{
+ /**
+ * Error codes
+ */
+ public static final String JSON_PARSE_ERROR_CODE = "Json parse failed";
+ public static final String BAD_QUERY_CONTEXT_ERROR_CODE = "Query context parse failed";
+ public static final String QUERY_CAPACITY_EXCEEDED_ERROR_CODE = "Query capacity exceeded";
+ public static final String QUERY_INTERRUPTED_ERROR_CODE = "Query interrupted";
+ // Note: the proper spelling is with a single "l", but the version with
+ // two "l"s is documented, we can't change the text of the message.
+ public static final String QUERY_CANCELED_ERROR_CODE = "Query cancelled";
+ public static final String UNAUTHORIZED_ERROR_CODE = "Unauthorized request";
+ public static final String UNSUPPORTED_OPERATION_ERROR_CODE = "Unsupported operation";
+ public static final String TRUNCATED_RESPONSE_CONTEXT_ERROR_CODE = "Truncated response context";
+ public static final String UNKNOWN_EXCEPTION_ERROR_CODE = "Unknown exception";
+ public static final String QUERY_TIMEOUT_ERROR_CODE = "Query timeout";
+ public static final String QUERY_UNSUPPORTED_ERROR_CODE = "Unsupported query";
+ public static final String RESOURCE_LIMIT_EXCEEDED_ERROR_CODE = "Resource limit exceeded";
+ public static final String SQL_PARSE_FAILED_ERROR_CODE = "SQL parse failed";
+ public static final String PLAN_VALIDATION_FAILED_ERROR_CODE = "Plan validation failed";
+ public static final String SQL_QUERY_UNSUPPORTED_ERROR_CODE = "SQL query is unsupported";
+
+ public enum FailType
+ {
+ USER_ERROR(400),
+ UNAUTHORIZED(401),
+ CAPACITY_EXCEEDED(429),
+ UNKNOWN(500),
+ CANCELED(500),
+ QUERY_RUNTIME_FAILURE(500),
+ UNSUPPORTED(501),
+ TIMEOUT(504);
+
+ private final int expectedStatus;
+
+ FailType(int expectedStatus)
+ {
+ this.expectedStatus = expectedStatus;
+ }
+
+ public int getExpectedStatus()
+ {
+ return expectedStatus;
+ }
+ }
+
+ public static FailType fromErrorCode(String errorCode)
+ {
+ if (errorCode == null) {
+ return FailType.UNKNOWN;
+ }
+
+ switch (errorCode) {
+ case QUERY_CANCELED_ERROR_CODE:
+ return FailType.CANCELED;
+
+ // These error codes are generally expected to come from a QueryInterruptedException
+ case QUERY_INTERRUPTED_ERROR_CODE:
+ case UNSUPPORTED_OPERATION_ERROR_CODE:
+ case UNKNOWN_EXCEPTION_ERROR_CODE:
+ case TRUNCATED_RESPONSE_CONTEXT_ERROR_CODE:
+ return FailType.QUERY_RUNTIME_FAILURE;
+ case UNAUTHORIZED_ERROR_CODE:
+ return FailType.UNAUTHORIZED;
+
+ case QUERY_CAPACITY_EXCEEDED_ERROR_CODE:
+ return FailType.CAPACITY_EXCEEDED;
+ case QUERY_TIMEOUT_ERROR_CODE:
+ return FailType.TIMEOUT;
+
+ // These error codes are expected to come from BadQueryExceptions
+ case JSON_PARSE_ERROR_CODE:
+ case BAD_QUERY_CONTEXT_ERROR_CODE:
+ case RESOURCE_LIMIT_EXCEEDED_ERROR_CODE:
+ // And these ones from the SqlPlanningException which are also BadQueryExceptions
+ case SQL_PARSE_FAILED_ERROR_CODE:
+ case PLAN_VALIDATION_FAILED_ERROR_CODE:
+ case SQL_QUERY_UNSUPPORTED_ERROR_CODE:
+ return FailType.USER_ERROR;
+ case QUERY_UNSUPPORTED_ERROR_CODE:
+ return FailType.UNSUPPORTED;
+ default:
+ return FailType.UNKNOWN;
+ }
+ }
+
+ /**
+ * Implementation
+ */
private final String errorCode;
private final String errorClass;
private final String host;
@@ -48,7 +158,6 @@ public class QueryException extends RuntimeException implements SanitizableExcep
this.host = host;
}
- @VisibleForTesting
@JsonCreator
public QueryException(
@JsonProperty("error") @Nullable String errorCode,
@@ -105,4 +214,9 @@ public class QueryException extends RuntimeException implements SanitizableExcep
{
return new QueryException(errorCode, errorMessageTransformFunction.apply(getMessage()), null, null);
}
+
+ public FailType getFailType()
+ {
+ return fromErrorCode(errorCode);
+ }
}
diff --git a/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java b/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java
index d3626e9c5e6..7bd4924000a 100644
--- a/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java
+++ b/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java
@@ -36,7 +36,6 @@ import javax.annotation.Nullable;
public class QueryTimeoutException extends QueryException
{
private static final String ERROR_CLASS = QueryTimeoutException.class.getName();
- public static final String ERROR_CODE = "Query timeout";
public static final String ERROR_MESSAGE = "Query Timed Out!";
public static final int STATUS_CODE = 504;
@@ -53,16 +52,16 @@ public class QueryTimeoutException extends QueryException
public QueryTimeoutException()
{
- super(ERROR_CODE, ERROR_MESSAGE, ERROR_CLASS, resolveHostname());
+ super(QUERY_TIMEOUT_ERROR_CODE, ERROR_MESSAGE, ERROR_CLASS, resolveHostname());
}
public QueryTimeoutException(String errorMessage)
{
- super(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
+ super(QUERY_TIMEOUT_ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
}
public QueryTimeoutException(String errorMessage, String host)
{
- super(ERROR_CODE, errorMessage, ERROR_CLASS, host);
+ super(QUERY_TIMEOUT_ERROR_CODE, errorMessage, ERROR_CLASS, host);
}
}
diff --git a/core/src/test/java/org/apache/druid/query/QueryExceptionTest.java b/core/src/test/java/org/apache/druid/query/QueryExceptionTest.java
index 51ff763762c..446e94b9667 100644
--- a/core/src/test/java/org/apache/druid/query/QueryExceptionTest.java
+++ b/core/src/test/java/org/apache/druid/query/QueryExceptionTest.java
@@ -19,17 +19,12 @@
package org.apache.druid.query;
+import org.apache.druid.query.QueryException.FailType;
import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
-import java.util.function.Function;
+import java.util.concurrent.atomic.AtomicLong;
-@RunWith(MockitoJUnitRunner.class)
public class QueryExceptionTest
{
private static final String ERROR_CODE = "error code";
@@ -38,36 +33,72 @@ public class QueryExceptionTest
private static final String ERROR_MESSAGE_ORIGINAL = "aaaa";
private static final String ERROR_MESSAGE_TRANSFORMED = "bbbb";
- @Mock
- private Function trasformFunction;
-
@Test
public void testSanitizeWithTransformFunctionReturningNull()
{
- Mockito.when(trasformFunction.apply(ArgumentMatchers.eq(ERROR_MESSAGE_ORIGINAL))).thenReturn(null);
QueryException queryException = new QueryException(ERROR_CODE, ERROR_MESSAGE_ORIGINAL, ERROR_CLASS, HOST);
- QueryException actual = queryException.sanitize(trasformFunction);
+
+ AtomicLong callCount = new AtomicLong(0);
+ QueryException actual = queryException.sanitize(s -> {
+ callCount.incrementAndGet();
+ Assert.assertEquals(ERROR_MESSAGE_ORIGINAL, s);
+ return null;
+ });
+
Assert.assertNotNull(actual);
Assert.assertEquals(actual.getErrorCode(), ERROR_CODE);
Assert.assertNull(actual.getMessage());
Assert.assertNull(actual.getHost());
Assert.assertNull(actual.getErrorClass());
- Mockito.verify(trasformFunction).apply(ArgumentMatchers.eq(ERROR_MESSAGE_ORIGINAL));
- Mockito.verifyNoMoreInteractions(trasformFunction);
+ Assert.assertEquals(1, callCount.get());
}
@Test
public void testSanitizeWithTransformFunctionReturningNewString()
{
- Mockito.when(trasformFunction.apply(ArgumentMatchers.eq(ERROR_MESSAGE_ORIGINAL))).thenReturn(ERROR_MESSAGE_TRANSFORMED);
QueryException queryException = new QueryException(ERROR_CODE, ERROR_MESSAGE_ORIGINAL, ERROR_CLASS, HOST);
- QueryException actual = queryException.sanitize(trasformFunction);
+
+ AtomicLong callCount = new AtomicLong(0);
+ QueryException actual = queryException.sanitize(s -> {
+ callCount.incrementAndGet();
+ Assert.assertEquals(ERROR_MESSAGE_ORIGINAL, s);
+ return ERROR_MESSAGE_TRANSFORMED;
+ });
+
Assert.assertNotNull(actual);
Assert.assertEquals(actual.getErrorCode(), ERROR_CODE);
Assert.assertEquals(actual.getMessage(), ERROR_MESSAGE_TRANSFORMED);
Assert.assertNull(actual.getHost());
Assert.assertNull(actual.getErrorClass());
- Mockito.verify(trasformFunction).apply(ArgumentMatchers.eq(ERROR_MESSAGE_ORIGINAL));
- Mockito.verifyNoMoreInteractions(trasformFunction);
+ Assert.assertEquals(1, callCount.get());
+ }
+
+ @Test
+ public void testSanity()
+ {
+ expectFailTypeForCode(FailType.UNKNOWN, null);
+ expectFailTypeForCode(FailType.UNKNOWN, "Nobody knows me.");
+ expectFailTypeForCode(FailType.QUERY_RUNTIME_FAILURE, QueryException.UNKNOWN_EXCEPTION_ERROR_CODE);
+ expectFailTypeForCode(FailType.USER_ERROR, QueryException.JSON_PARSE_ERROR_CODE);
+ expectFailTypeForCode(FailType.USER_ERROR, QueryException.BAD_QUERY_CONTEXT_ERROR_CODE);
+ expectFailTypeForCode(FailType.CAPACITY_EXCEEDED, QueryException.QUERY_CAPACITY_EXCEEDED_ERROR_CODE);
+ expectFailTypeForCode(FailType.QUERY_RUNTIME_FAILURE, QueryException.QUERY_INTERRUPTED_ERROR_CODE);
+ expectFailTypeForCode(FailType.CANCELED, QueryException.QUERY_CANCELED_ERROR_CODE);
+ expectFailTypeForCode(FailType.UNAUTHORIZED, QueryException.UNAUTHORIZED_ERROR_CODE);
+ expectFailTypeForCode(FailType.QUERY_RUNTIME_FAILURE, QueryException.UNSUPPORTED_OPERATION_ERROR_CODE);
+ expectFailTypeForCode(FailType.QUERY_RUNTIME_FAILURE, QueryException.TRUNCATED_RESPONSE_CONTEXT_ERROR_CODE);
+ expectFailTypeForCode(FailType.TIMEOUT, QueryException.QUERY_TIMEOUT_ERROR_CODE);
+ expectFailTypeForCode(FailType.UNSUPPORTED, QueryException.QUERY_UNSUPPORTED_ERROR_CODE);
+ expectFailTypeForCode(FailType.USER_ERROR, QueryException.RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
+ expectFailTypeForCode(FailType.USER_ERROR, QueryException.SQL_PARSE_FAILED_ERROR_CODE);
+ expectFailTypeForCode(FailType.USER_ERROR, QueryException.PLAN_VALIDATION_FAILED_ERROR_CODE);
+ expectFailTypeForCode(FailType.USER_ERROR, QueryException.SQL_QUERY_UNSUPPORTED_ERROR_CODE);
+ }
+
+ private void expectFailTypeForCode(FailType expected, String code)
+ {
+ QueryException exception = new QueryException(new Exception(), code, "java.lang.Exception", "test");
+
+ Assert.assertEquals(code, expected, exception.getFailType());
}
}
diff --git a/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java b/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java
index ab187a1fb42..acc6b44d58f 100644
--- a/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java
+++ b/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java
@@ -19,7 +19,10 @@
package org.apache.druid.query;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
import org.junit.Assert;
import org.junit.Test;
@@ -30,7 +33,26 @@ public class QueryTimeoutExceptionTest
@Test
public void testSerde() throws IOException
{
- final ObjectMapper mapper = new ObjectMapper();
+ // We re-create the configuration from DefaultObjectMapper here because this is in `core` and
+ // DefaultObjectMapper is in `processing`. Hopefully that distinction disappears at some point
+ // in time, but it exists today and moving things one way or the other quickly turns into just
+ // chunking it all together.
+ final ObjectMapper mapper = new ObjectMapper()
+ {
+ {
+ configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ configure(MapperFeature.AUTO_DETECT_GETTERS, false);
+ // See https://github.com/FasterXML/jackson-databind/issues/170
+ // configure(MapperFeature.AUTO_DETECT_CREATORS, false);
+ configure(MapperFeature.AUTO_DETECT_FIELDS, false);
+ configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
+ configure(MapperFeature.AUTO_DETECT_SETTERS, false);
+ configure(MapperFeature.ALLOW_FINAL_FIELDS_AS_MUTATORS, false);
+ configure(SerializationFeature.INDENT_OUTPUT, false);
+ configure(SerializationFeature.FLUSH_AFTER_WRITE_VALUE, false);
+ }
+ };
+
QueryTimeoutException timeoutException = mapper.readValue(
mapper.writeValueAsBytes(new QueryTimeoutException()),
QueryTimeoutException.class
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITSqlCancelTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITSqlCancelTest.java
index ad8dd3cb11f..d5bc1e30204 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITSqlCancelTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITSqlCancelTest.java
@@ -27,7 +27,6 @@ import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.QueryException;
-import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.SqlResourceTestClient;
@@ -107,10 +106,10 @@ public class ITSqlCancelTest
throw new ISE("Query is not canceled after cancel request");
}
QueryException queryException = jsonMapper.readValue(queryResponse.getContent(), QueryException.class);
- if (!QueryInterruptedException.QUERY_CANCELED.equals(queryException.getErrorCode())) {
+ if (!"Query cancelled".equals(queryException.getErrorCode())) {
throw new ISE(
"Expected error code [%s], actual [%s]",
- QueryInterruptedException.QUERY_CANCELED,
+ "Query cancelled",
queryException.getErrorCode()
);
}
@@ -138,7 +137,11 @@ public class ITSqlCancelTest
final StatusResponseHolder queryResponse = queryResponseFuture.get(30, TimeUnit.SECONDS);
if (!queryResponse.getStatus().equals(HttpResponseStatus.OK)) {
- throw new ISE("Cancel request failed with status[%s] and content[%s]", queryResponse.getStatus(), queryResponse.getContent());
+ throw new ISE(
+ "Cancel request failed with status[%s] and content[%s]",
+ queryResponse.getStatus(),
+ queryResponse.getContent()
+ );
}
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/BadJsonQueryException.java b/processing/src/main/java/org/apache/druid/query/BadJsonQueryException.java
index 8be18edf18a..47ba3c90777 100644
--- a/processing/src/main/java/org/apache/druid/query/BadJsonQueryException.java
+++ b/processing/src/main/java/org/apache/druid/query/BadJsonQueryException.java
@@ -25,12 +25,11 @@ import com.fasterxml.jackson.core.JsonParseException;
public class BadJsonQueryException extends BadQueryException
{
- public static final String ERROR_CODE = "Json parse failed";
public static final String ERROR_CLASS = JsonParseException.class.getName();
public BadJsonQueryException(JsonParseException e)
{
- this(ERROR_CODE, e.getMessage(), ERROR_CLASS);
+ this(JSON_PARSE_ERROR_CODE, e.getMessage(), ERROR_CLASS);
}
@JsonCreator
diff --git a/processing/src/main/java/org/apache/druid/query/BadQueryContextException.java b/processing/src/main/java/org/apache/druid/query/BadQueryContextException.java
index 29f63b1f40e..cbfb0ca410c 100644
--- a/processing/src/main/java/org/apache/druid/query/BadQueryContextException.java
+++ b/processing/src/main/java/org/apache/druid/query/BadQueryContextException.java
@@ -24,17 +24,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class BadQueryContextException extends BadQueryException
{
- public static final String ERROR_CODE = "Query context parse failed";
public static final String ERROR_CLASS = BadQueryContextException.class.getName();
- public BadQueryContextException(Exception e)
- {
- this(ERROR_CODE, e.getMessage(), ERROR_CLASS);
- }
-
public BadQueryContextException(String msg)
{
- this(ERROR_CODE, msg, ERROR_CLASS);
+ this(BAD_QUERY_CONTEXT_ERROR_CODE, msg, ERROR_CLASS);
}
@JsonCreator
diff --git a/processing/src/main/java/org/apache/druid/query/QueryCapacityExceededException.java b/processing/src/main/java/org/apache/druid/query/QueryCapacityExceededException.java
index f62eb9166d8..694fbb780cb 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryCapacityExceededException.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryCapacityExceededException.java
@@ -32,7 +32,7 @@ import org.apache.druid.java.util.common.StringUtils;
*
When the query is rejected by QueryScheduler.
*
When the query cannot acquire enough merge buffers for groupBy v2
*
- *
+ *
* As a {@link QueryException} it is expected to be serialied to a json response, but will be mapped to
* {@link #STATUS_CODE} instead of the default HTTP 500 status.
*/
@@ -43,17 +43,16 @@ public class QueryCapacityExceededException extends QueryException
private static final String LANE_ERROR_MESSAGE_TEMPLATE =
"Too many concurrent queries for lane '%s', query capacity of %s exceeded. Please try your query again later.";
private static final String ERROR_CLASS = QueryCapacityExceededException.class.getName();
- public static final String ERROR_CODE = "Query capacity exceeded";
public static final int STATUS_CODE = 429;
public QueryCapacityExceededException(int capacity)
{
- super(ERROR_CODE, makeTotalErrorMessage(capacity), ERROR_CLASS, null);
+ super(QUERY_CAPACITY_EXCEEDED_ERROR_CODE, makeTotalErrorMessage(capacity), ERROR_CLASS, null);
}
public QueryCapacityExceededException(String lane, int capacity)
{
- super(ERROR_CODE, makeLaneErrorMessage(lane, capacity), ERROR_CLASS, null);
+ super(QUERY_CAPACITY_EXCEEDED_ERROR_CODE, makeLaneErrorMessage(lane, capacity), ERROR_CLASS, null);
}
/**
@@ -62,7 +61,12 @@ public class QueryCapacityExceededException extends QueryException
*/
public static QueryCapacityExceededException withErrorMessageAndResolvedHost(String errorMessage)
{
- return new QueryCapacityExceededException(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
+ return new QueryCapacityExceededException(
+ QUERY_CAPACITY_EXCEEDED_ERROR_CODE,
+ errorMessage,
+ ERROR_CLASS,
+ resolveHostname()
+ );
}
@JsonCreator
diff --git a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java
index ae67039242f..91760aa7c10 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java
@@ -29,7 +29,7 @@ import java.util.concurrent.CancellationException;
/**
* Exception representing a failed query. The name "QueryInterruptedException" is a misnomer; this is actually
* used on the client side for *all* kinds of failed queries.
- *
+ *
* Fields:
* - "errorCode" is a well-defined errorCode code taken from a specific list (see the static constants). "Unknown exception"
* represents all wrapped exceptions other than interrupt, cancellation, resource limit exceeded, unauthorized
@@ -37,21 +37,12 @@ import java.util.concurrent.CancellationException;
* - "errorMessage" is the toString of the wrapped exception
* - "errorClass" is the class of the wrapped exception
* - "host" is the host that the errorCode occurred on
- *
+ *
* The QueryResource is expected to emit the JSON form of this object when errors happen, and the DirectDruidClient
* deserializes and wraps them.
*/
public class QueryInterruptedException extends QueryException
{
- public static final String QUERY_INTERRUPTED = "Query interrupted";
- // Note: the proper spelling is with a single "l", but the version with
- // two "l"s is documented, we can't change the text of the message.
- public static final String QUERY_CANCELED = "Query cancelled";
- public static final String UNAUTHORIZED = "Unauthorized request";
- public static final String UNSUPPORTED_OPERATION = "Unsupported operation";
- public static final String TRUNCATED_RESPONSE_CONTEXT = "Truncated response context";
- public static final String UNKNOWN_EXCEPTION = "Unknown exception";
-
@JsonCreator
public QueryInterruptedException(
@JsonProperty("error") @Nullable String errorCode,
@@ -96,15 +87,15 @@ public class QueryInterruptedException extends QueryException
if (e instanceof QueryInterruptedException) {
return ((QueryInterruptedException) e).getErrorCode();
} else if (e instanceof InterruptedException) {
- return QUERY_INTERRUPTED;
+ return QUERY_INTERRUPTED_ERROR_CODE;
} else if (e instanceof CancellationException) {
- return QUERY_CANCELED;
+ return QUERY_CANCELED_ERROR_CODE;
} else if (e instanceof UnsupportedOperationException) {
- return UNSUPPORTED_OPERATION;
+ return UNSUPPORTED_OPERATION_ERROR_CODE;
} else if (e instanceof TruncatedResponseContextException) {
- return TRUNCATED_RESPONSE_CONTEXT;
+ return TRUNCATED_RESPONSE_CONTEXT_ERROR_CODE;
} else {
- return UNKNOWN_EXCEPTION;
+ return UNKNOWN_EXCEPTION_ERROR_CODE;
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/QueryUnsupportedException.java b/processing/src/main/java/org/apache/druid/query/QueryUnsupportedException.java
index bde1f9d14e1..81d82a94871 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryUnsupportedException.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryUnsupportedException.java
@@ -29,14 +29,13 @@ import javax.annotation.Nullable;
* This exception is for the query engine to surface when a query cannot be run. This can be due to the
* following reasons: 1) The query is not supported yet. 2) The query is not something Druid would ever supports.
* For these cases, the exact causes and details should also be documented in Druid user facing documents.
- *
+ *
* As a {@link QueryException} it is expected to be serialized to a json response with a proper HTTP error code
* ({@link #STATUS_CODE}).
*/
public class QueryUnsupportedException extends QueryException
{
private static final String ERROR_CLASS = QueryUnsupportedException.class.getName();
- public static final String ERROR_CODE = "Unsupported query";
public static final int STATUS_CODE = 501;
@JsonCreator
@@ -52,6 +51,6 @@ public class QueryUnsupportedException extends QueryException
public QueryUnsupportedException(String errorMessage)
{
- super(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
+ super(QUERY_UNSUPPORTED_ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/ResourceLimitExceededException.java b/processing/src/main/java/org/apache/druid/query/ResourceLimitExceededException.java
index 169d774cabf..a41699cf442 100644
--- a/processing/src/main/java/org/apache/druid/query/ResourceLimitExceededException.java
+++ b/processing/src/main/java/org/apache/druid/query/ResourceLimitExceededException.java
@@ -25,7 +25,7 @@ import org.apache.druid.java.util.common.StringUtils;
/**
* Exception indicating that an operation failed because it exceeded some configured resource limit.
- *
+ *
* This is a {@link BadQueryException} because it likely indicates a user's misbehavior when this exception is thrown.
* The resource limitations set by Druid cluster operators are typically less flexible than the parameters of
* a user query, so when a user query requires too many resources, the likely remedy is that the user query
@@ -33,8 +33,6 @@ import org.apache.druid.java.util.common.StringUtils;
*/
public class ResourceLimitExceededException extends BadQueryException
{
- public static final String ERROR_CODE = "Resource limit exceeded";
-
public static ResourceLimitExceededException withMessage(String message, Object... arguments)
{
return new ResourceLimitExceededException(StringUtils.nonStrictFormat(message, arguments));
@@ -47,7 +45,7 @@ public class ResourceLimitExceededException extends BadQueryException
public ResourceLimitExceededException(String message)
{
- this(ERROR_CODE, message, ResourceLimitExceededException.class.getName());
+ this(RESOURCE_LIMIT_EXCEEDED_ERROR_CODE, message, ResourceLimitExceededException.class.getName());
}
@JsonCreator
diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
index a943297d173..6727782cc40 100644
--- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
+++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
@@ -80,18 +80,18 @@ import java.util.stream.Collectors;
*
Manages headers size by dropping fields when the header would get too
* large.
*
- *
+ *
* A result is that the information the context, when inspected by a calling
* query, may be incomplete if some of it was previously dropped by the
* called query.
*
*
API
- *
+ *
* The query profile needs to obtain the full, untruncated information. To do this
* it piggy-backs on the set operations to obtain the full value. To ensure this
* is possible, code that works with standard values should call the set (or add)
* functions provided which will do the needed map update.
- */
+ */
@PublicApi
public abstract class ResponseContext
{
@@ -118,7 +118,7 @@ public abstract class ResponseContext
/**
* Merges two values of type T.
- *
+ *
* This method may modify "oldValue" but must not modify "newValue".
*/
Object mergeValues(Object oldValue, Object newValue);
@@ -317,7 +317,8 @@ public abstract class ResponseContext
true, true,
new TypeReference>()
{
- })
+ }
+ )
{
@Override
@SuppressWarnings("unchecked")
@@ -334,14 +335,15 @@ public abstract class ResponseContext
*/
public static final Key UNCOVERED_INTERVALS_OVERFLOWED = new BooleanKey(
"uncoveredIntervalsOverflowed",
- true);
+ true
+ );
/**
* Map of most relevant query ID to remaining number of responses from query nodes.
* The value is initialized in {@code CachingClusteredClient} when it initializes the connection to the query nodes,
* and is updated whenever they respond (@code DirectDruidClient). {@code RetryQueryRunner} uses this value to
* check if the {@link #MISSING_SEGMENTS} is valid.
- *
+ *
* Currently, the broker doesn't run subqueries in parallel, the remaining number of responses will be updated
* one by one per subquery. However, since it can be parallelized to run subqueries simultaneously, we store them
* in a ConcurrentHashMap.
@@ -351,7 +353,8 @@ public abstract class ResponseContext
public static final Key REMAINING_RESPONSES_FROM_QUERY_SERVERS = new AbstractKey(
"remainingResponsesFromQueryServers",
false, true,
- Object.class)
+ Object.class
+ )
{
@Override
@SuppressWarnings("unchecked")
@@ -361,7 +364,8 @@ public abstract class ResponseContext
final NonnullPair pair = (NonnullPair) idAndNumResponses;
map.compute(
pair.lhs,
- (id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs);
+ (id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs
+ );
return map;
}
};
@@ -372,7 +376,10 @@ public abstract class ResponseContext
public static final Key MISSING_SEGMENTS = new AbstractKey(
"missingSegments",
true, true,
- new TypeReference>() {})
+ new TypeReference>()
+ {
+ }
+ )
{
@Override
@SuppressWarnings("unchecked")
@@ -396,7 +403,10 @@ public abstract class ResponseContext
public static final Key QUERY_TOTAL_BYTES_GATHERED = new AbstractKey(
"queryTotalBytesGathered",
false, false,
- new TypeReference() {})
+ new TypeReference()
+ {
+ }
+ )
{
@Override
public Object mergeValues(Object oldValue, Object newValue)
@@ -410,7 +420,8 @@ public abstract class ResponseContext
*/
public static final Key QUERY_FAIL_DEADLINE_MILLIS = new LongKey(
"queryFailTime",
- false);
+ false
+ );
/**
* This variable indicates when a running query should be expired,
@@ -418,17 +429,19 @@ public abstract class ResponseContext
*/
public static final Key TIMEOUT_AT = new LongKey(
"timeoutAt",
- false);
+ false
+ );
/**
* The number of rows scanned by {@link org.apache.druid.query.scan.ScanQueryEngine}.
- *
+ *
* Named "count" for backwards compatibility with older data servers that still send this, even though it's now
* marked as internal.
*/
public static final Key NUM_SCANNED_ROWS = new CounterKey(
"count",
- false);
+ false
+ );
/**
* The total CPU time for threads related to Sequence processing of the query.
@@ -437,14 +450,16 @@ public abstract class ResponseContext
*/
public static final Key CPU_CONSUMED_NANOS = new CounterKey(
"cpuConsumed",
- false);
+ false
+ );
/**
* Indicates if a {@link ResponseContext} was truncated during serialization.
*/
public static final Key TRUNCATED = new BooleanKey(
"truncated",
- false);
+ false
+ );
/**
* One and only global list of keys. This is a semi-constant: it is mutable
@@ -461,20 +476,21 @@ public abstract class ResponseContext
private final ConcurrentMap registeredKeys = new ConcurrentSkipListMap<>();
static {
- instance().registerKeys(new Key[]
- {
- UNCOVERED_INTERVALS,
- UNCOVERED_INTERVALS_OVERFLOWED,
- REMAINING_RESPONSES_FROM_QUERY_SERVERS,
- MISSING_SEGMENTS,
- ETAG,
- QUERY_TOTAL_BYTES_GATHERED,
- QUERY_FAIL_DEADLINE_MILLIS,
- TIMEOUT_AT,
- NUM_SCANNED_ROWS,
- CPU_CONSUMED_NANOS,
- TRUNCATED,
- });
+ instance().registerKeys(
+ new Key[]{
+ UNCOVERED_INTERVALS,
+ UNCOVERED_INTERVALS_OVERFLOWED,
+ REMAINING_RESPONSES_FROM_QUERY_SERVERS,
+ MISSING_SEGMENTS,
+ ETAG,
+ QUERY_TOTAL_BYTES_GATHERED,
+ QUERY_FAIL_DEADLINE_MILLIS,
+ TIMEOUT_AT,
+ NUM_SCANNED_ROWS,
+ CPU_CONSUMED_NANOS,
+ TRUNCATED,
+ }
+ );
}
/**
@@ -701,8 +717,10 @@ public abstract class ResponseContext
public void addRemainingResponse(String id, int count)
{
- addValue(Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS,
- new NonnullPair<>(id, count));
+ addValue(
+ Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS,
+ new NonnullPair<>(id, count)
+ );
}
public void addMissingSegments(List descriptors)
@@ -820,7 +838,6 @@ public abstract class ResponseContext
*
* @param node {@link ArrayNode} which elements are being removed.
* @param target the number of chars need to be removed.
- *
* @return the number of removed chars.
*/
private static int removeNodeElementsToSatisfyCharsLimit(ArrayNode node, int target)
@@ -851,7 +868,7 @@ public abstract class ResponseContext
private final String truncatedResult;
private final String fullResult;
- SerializationResult(@Nullable String truncatedResult, String fullResult)
+ public SerializationResult(@Nullable String truncatedResult, String fullResult)
{
this.truncatedResult = truncatedResult;
this.fullResult = fullResult;
diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
index 97c772ed192..e14e24c2231 100644
--- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
+++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
@@ -168,7 +168,11 @@ public class JsonParserIterator implements Iterator, Closeable
} else if (checkTimeout()) {
throw timeoutQuery();
} else {
- // TODO: NettyHttpClient should check the actual cause of the failure and set it in the future properly.
+ // The InputStream is null and we have not timed out, there might be multiple reasons why we could hit this
+ // condition, guess that we are hitting it because of scatter-gather bytes. It would be better to be more
+ // explicit about why errors are happening than guessing, but this comment is being rewritten from a T-O-D-O,
+ // so the intent is just to document this better rather than do all of the logic to fix it. If/when we get
+ // this exception thrown for other reasons, it would be great to document what other reasons this can happen.
throw ResourceLimitExceededException.withMessage(
"Possibly max scatter-gather bytes limit reached while reading from url[%s].",
url
@@ -207,11 +211,11 @@ public class JsonParserIterator implements Iterator, Closeable
/**
* Converts the given exception to a proper type of {@link QueryException}.
* The use cases of this method are:
- *
+ *
* - All non-QueryExceptions are wrapped with {@link QueryInterruptedException}.
* - The QueryException from {@link DirectDruidClient} is converted to a more specific type of QueryException
- * based on {@link QueryException#getErrorCode()}. During conversion, {@link QueryException#host} is overridden
- * by {@link #host}.
+ * based on {@link QueryException#getErrorCode()}. During conversion, {@link QueryException#host} is overridden
+ * by {@link #host}.
*/
private QueryException convertException(Throwable cause)
{
@@ -219,9 +223,9 @@ public class JsonParserIterator implements Iterator, Closeable
if (cause instanceof QueryException) {
final QueryException queryException = (QueryException) cause;
if (queryException.getErrorCode() == null) {
- // errorCode should not be null now, but maybe could be null in the past..
+ // errorCode should not be null now, but maybe could be null in the past...
return new QueryInterruptedException(
- queryException.getErrorCode(),
+ QueryException.UNKNOWN_EXCEPTION_ERROR_CODE,
queryException.getMessage(),
queryException.getErrorClass(),
host
@@ -229,32 +233,37 @@ public class JsonParserIterator implements Iterator, Closeable
}
// Note: this switch clause is to restore the 'type' information of QueryExceptions which is lost during
- // JSON serialization. This is not a good way to restore the correct exception type. Rather, QueryException
- // should store its type when it is serialized, so that we can know the exact type when it is deserialized.
+ // JSON serialization. As documented on the QueryException class, the errorCode of QueryException is the only
+ // way to differentiate the cause of the exception. This code does not cover all possible exceptions that
+ // could come up and so, likely, doesn't produce exceptions reliably. The only safe way to catch and interact
+ // with a QueryException is to catch QueryException and check its errorCode. In some future code change, we
+ // should likely remove this switch entirely, but when we do that, we need to make sure to also adjust any
+ // points in the code that are catching the specific child Exceptions to instead catch QueryException and
+ // check the errorCode.
switch (queryException.getErrorCode()) {
// The below is the list of exceptions that can be thrown in historicals and propagated to the broker.
- case QueryTimeoutException.ERROR_CODE:
+ case QueryException.QUERY_TIMEOUT_ERROR_CODE:
return new QueryTimeoutException(
queryException.getErrorCode(),
queryException.getMessage(),
queryException.getErrorClass(),
host
);
- case QueryCapacityExceededException.ERROR_CODE:
+ case QueryException.QUERY_CAPACITY_EXCEEDED_ERROR_CODE:
return new QueryCapacityExceededException(
queryException.getErrorCode(),
queryException.getMessage(),
queryException.getErrorClass(),
host
);
- case QueryUnsupportedException.ERROR_CODE:
+ case QueryException.QUERY_UNSUPPORTED_ERROR_CODE:
return new QueryUnsupportedException(
queryException.getErrorCode(),
queryException.getMessage(),
queryException.getErrorClass(),
host
);
- case ResourceLimitExceededException.ERROR_CODE:
+ case QueryException.RESOURCE_LIMIT_EXCEEDED_ERROR_CODE:
return new ResourceLimitExceededException(
queryException.getErrorCode(),
queryException.getMessage(),
diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java
index 743ca9e60ba..84e6acf24c1 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResource.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResource.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.SequenceWriter;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.joda.ser.DateTimeSerializer;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
@@ -30,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
-import com.google.common.io.CountingOutputStream;
import com.google.inject.Inject;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.guice.LazySingleton;
@@ -38,20 +38,13 @@ import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.BadJsonQueryException;
-import org.apache.druid.query.BadQueryException;
import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryException;
import org.apache.druid.query.QueryInterruptedException;
-import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.TruncatedResponseContextException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContext.Keys;
@@ -64,7 +57,9 @@ import org.apache.druid.server.security.ForbiddenException;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
+import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.POST;
@@ -72,12 +67,10 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -90,6 +83,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class QueryResource implements QueryCountStatsProvider
{
protected static final EmittingLogger log = new EmittingLogger(QueryResource.class);
+ public static final EmittingLogger NO_STACK_LOGGER = log.noStackTrace();
+
@Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE
protected static final String APPLICATION_SMILE = "application/smile";
@@ -116,6 +111,7 @@ public class QueryResource implements QueryCountStatsProvider
private final AtomicLong failedQueryCount = new AtomicLong();
private final AtomicLong interruptedQueryCount = new AtomicLong();
private final AtomicLong timedOutQueryCount = new AtomicLong();
+ private final QueryResourceQueryMetricCounter counter = new QueryResourceQueryMetricCounter();
@Inject
public QueryResource(
@@ -171,23 +167,28 @@ public class QueryResource implements QueryCountStatsProvider
@POST
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE})
+ @Nullable
public Response doPost(
final InputStream in,
@QueryParam("pretty") final String pretty,
-
- // used to get request content-type,Accept header, remote address and auth-related headers
@Context final HttpServletRequest req
) throws IOException
{
final QueryLifecycle queryLifecycle = queryLifecycleFactory.factorize();
- final ResourceIOReaderWriter ioReaderWriter = createResourceIOReaderWriter(req, pretty != null);
+ final ResourceIOReaderWriter io = createResourceIOReaderWriter(req, pretty != null);
final String currThreadName = Thread.currentThread().getName();
try {
- final Query> query = readQuery(req, in, ioReaderWriter);
+ final Query> query;
+ try {
+ query = readQuery(req, in, io);
+ }
+ catch (QueryException e) {
+ return io.getResponseWriter().buildNonOkResponse(e.getFailType().getExpectedStatus(), e);
+ }
+
queryLifecycle.initialize(query);
- final String queryId = queryLifecycle.getQueryId();
final String queryThreadName = queryLifecycle.threadName(currThreadName);
Thread.currentThread().setName(queryThreadName);
@@ -195,137 +196,88 @@ public class QueryResource implements QueryCountStatsProvider
log.debug("Got query [%s]", queryLifecycle.getQuery());
}
- final Access authResult = queryLifecycle.authorize(req);
+ final Access authResult;
+ try {
+ authResult = queryLifecycle.authorize(req);
+ }
+ catch (RuntimeException e) {
+ final QueryException qe;
+
+ if (e instanceof QueryException) {
+ qe = (QueryException) e;
+ } else {
+ qe = new QueryInterruptedException(e);
+ }
+
+ return io.getResponseWriter().buildNonOkResponse(qe.getFailType().getExpectedStatus(), qe);
+ }
+
if (!authResult.isAllowed()) {
throw new ForbiddenException(authResult.toString());
}
- final QueryResponse> queryResponse = queryLifecycle.execute();
- final Sequence> results = queryResponse.getResults();
- final ResponseContext responseContext = queryResponse.getResponseContext();
- final String prevEtag = getPreviousEtag(req);
-
- if (prevEtag != null && prevEtag.equals(responseContext.getEntityTag())) {
- queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), -1);
- successfulQueryCount.incrementAndGet();
- return Response.notModified().build();
- }
-
- final Yielder> yielder = Yielders.each(results);
+ // We use an async context not because we are actually going to run this async, but because we want to delay
+ // the decision of what the response code should be until we have gotten the first few data points to return.
+ // Returning a Response object from this point forward requires that object to know the status code, which we
+ // don't actually know until we are in the accumulator, but if we try to return a Response object from the
+ // accumulator, we cannot properly stream results back, because the accumulator won't release control of the
+ // Response until it has consumed the underlying Sequence.
+ final AsyncContext asyncContext = req.startAsync();
try {
- final ObjectWriter jsonWriter = queryLifecycle.newOutputWriter(ioReaderWriter);
-
- Response.ResponseBuilder responseBuilder = Response
- .ok(
- new StreamingOutput()
- {
- @Override
- public void write(OutputStream outputStream) throws WebApplicationException
- {
- Exception e = null;
-
- CountingOutputStream os = new CountingOutputStream(outputStream);
- try {
- // json serializer will always close the yielder
- jsonWriter.writeValue(os, yielder);
-
- os.flush(); // Some types of OutputStream suppress flush errors in the .close() method.
- os.close();
- }
- catch (Exception ex) {
- e = ex;
- log.noStackTrace().error(ex, "Unable to send query response.");
- throw new RuntimeException(ex);
- }
- finally {
- Thread.currentThread().setName(currThreadName);
-
- queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), os.getCount());
-
- if (e == null) {
- successfulQueryCount.incrementAndGet();
- } else {
- failedQueryCount.incrementAndGet();
- }
- }
- }
- },
- ioReaderWriter.getResponseWriter().getResponseType()
- )
- .header(QUERY_ID_RESPONSE_HEADER, queryId);
-
- attachResponseContextToHttpResponse(queryId, responseContext, responseBuilder, jsonMapper,
- responseContextConfig, selfNode
- );
-
- return responseBuilder.build();
- }
- catch (QueryException e) {
- // make sure to close yielder if anything happened before starting to serialize the response.
- yielder.close();
- throw e;
- }
- catch (Exception e) {
- // make sure to close yielder if anything happened before starting to serialize the response.
- yielder.close();
- throw new RuntimeException(e);
+ new QueryResourceQueryResultPusher(req, queryLifecycle, io, (HttpServletResponse) asyncContext.getResponse())
+ .push();
}
finally {
- // do not close yielder here, since we do not want to close the yielder prior to
- // StreamingOutput having iterated over all the results
+ asyncContext.complete();
}
}
- catch (QueryInterruptedException e) {
- interruptedQueryCount.incrementAndGet();
- queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1);
- return ioReaderWriter.getResponseWriter().gotError(e);
- }
- catch (QueryTimeoutException timeout) {
- timedOutQueryCount.incrementAndGet();
- queryLifecycle.emitLogsAndMetrics(timeout, req.getRemoteAddr(), -1);
- return ioReaderWriter.getResponseWriter().gotTimeout(timeout);
- }
- catch (QueryCapacityExceededException cap) {
- failedQueryCount.incrementAndGet();
- queryLifecycle.emitLogsAndMetrics(cap, req.getRemoteAddr(), -1);
- return ioReaderWriter.getResponseWriter().gotLimited(cap);
- }
- catch (QueryUnsupportedException unsupported) {
- failedQueryCount.incrementAndGet();
- queryLifecycle.emitLogsAndMetrics(unsupported, req.getRemoteAddr(), -1);
- return ioReaderWriter.getResponseWriter().gotUnsupported(unsupported);
- }
- catch (BadQueryException e) {
- interruptedQueryCount.incrementAndGet();
- queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1);
- return ioReaderWriter.getResponseWriter().gotBadQuery(e);
- }
- catch (ForbiddenException e) {
- // don't do anything for an authorization failure, ForbiddenExceptionMapper will catch this later and
- // send an error response if this is thrown.
- throw e;
- }
catch (Exception e) {
- failedQueryCount.incrementAndGet();
- queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1);
+ if (e instanceof ForbiddenException && !req.isAsyncStarted()) {
+ // We can only pass through the Forbidden exception if we haven't started async yet.
+ throw e;
+ }
+ log.warn(e, "Uncaught exception from query processing. This should be caught and handled directly.");
- log.noStackTrace()
- .makeAlert(e, "Exception handling request")
- .addData(
- "query",
- queryLifecycle.getQuery() != null
- ? jsonMapper.writeValueAsString(queryLifecycle.getQuery())
- : "unparseable query"
- )
- .addData("peer", req.getRemoteAddr())
- .emit();
-
- return ioReaderWriter.getResponseWriter().gotError(e);
+ // Just fall back to the async context.
+ AsyncContext asyncContext = req.startAsync();
+ try {
+ final HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
+ // If the response is committed, we actually processed and started doing things with the request,
+ // so the best we can do is just complete in the finally and hope for the best.
+ if (!response.isCommitted()) {
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ response.setContentType(MediaType.APPLICATION_JSON);
+ try (OutputStream out = response.getOutputStream()) {
+ final QueryException responseException = new QueryException(
+ QueryException.UNKNOWN_EXCEPTION_ERROR_CODE,
+ "Unhandled exception made it to the top",
+ e.getClass().getName(),
+ req.getRemoteHost()
+ );
+ out.write(jsonMapper.writeValueAsBytes(responseException));
+ }
+ }
+ }
+ finally {
+ asyncContext.complete();
+ }
}
finally {
Thread.currentThread().setName(currThreadName);
}
+ return null;
+ }
+
+ public interface QueryMetricCounter
+ {
+ void incrementSuccess();
+
+ void incrementFailed();
+
+ void incrementInterrupted();
+
+ void incrementTimedOut();
}
public static void attachResponseContextToHttpResponse(
@@ -416,16 +368,20 @@ public class QueryResource implements QueryCountStatsProvider
// response type defaults to Content-Type if 'Accept' header not provided
String responseType = Strings.isNullOrEmpty(acceptHeader) ? requestType : acceptHeader;
- boolean isRequestSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(requestType) || APPLICATION_SMILE.equals(requestType);
- boolean isResponseSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(responseType) || APPLICATION_SMILE.equals(responseType);
+ boolean isRequestSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(requestType) || APPLICATION_SMILE.equals(
+ requestType);
+ boolean isResponseSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(responseType)
+ || APPLICATION_SMILE.equals(responseType);
return new ResourceIOReaderWriter(
isRequestSmile ? smileMapper : jsonMapper,
- new ResourceIOWriter(isResponseSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON,
- isResponseSmile ? smileMapper : jsonMapper,
- isResponseSmile ? serializeDateTimeAsLongSmileMapper : serializeDateTimeAsLongJsonMapper,
- pretty
- ));
+ new ResourceIOWriter(
+ isResponseSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON,
+ isResponseSmile ? smileMapper : jsonMapper,
+ isResponseSmile ? serializeDateTimeAsLongSmileMapper : serializeDateTimeAsLongJsonMapper,
+ pretty
+ )
+ );
}
protected static class ResourceIOReaderWriter
@@ -504,26 +460,6 @@ public class QueryResource implements QueryCountStatsProvider
);
}
- Response gotTimeout(QueryTimeoutException e) throws IOException
- {
- return buildNonOkResponse(QueryTimeoutException.STATUS_CODE, e);
- }
-
- Response gotLimited(QueryCapacityExceededException e) throws IOException
- {
- return buildNonOkResponse(QueryCapacityExceededException.STATUS_CODE, e);
- }
-
- Response gotUnsupported(QueryUnsupportedException e) throws IOException
- {
- return buildNonOkResponse(QueryUnsupportedException.STATUS_CODE, e);
- }
-
- Response gotBadQuery(BadQueryException e) throws IOException
- {
- return buildNonOkResponse(BadQueryException.STATUS_CODE, e);
- }
-
Response buildNonOkResponse(int status, Exception e) throws JsonProcessingException
{
return Response.status(status)
@@ -565,4 +501,142 @@ public class QueryResource implements QueryCountStatsProvider
builder.header(HEADER_ETAG, entityTag);
}
}
+
+ private class QueryResourceQueryMetricCounter implements QueryMetricCounter
+ {
+ @Override
+ public void incrementSuccess()
+ {
+ successfulQueryCount.incrementAndGet();
+ }
+
+ @Override
+ public void incrementFailed()
+ {
+ failedQueryCount.incrementAndGet();
+ }
+
+ @Override
+ public void incrementInterrupted()
+ {
+ interruptedQueryCount.incrementAndGet();
+ }
+
+ @Override
+ public void incrementTimedOut()
+ {
+ timedOutQueryCount.incrementAndGet();
+ }
+ }
+
+ private class QueryResourceQueryResultPusher extends QueryResultPusher
+ {
+ private final HttpServletRequest req;
+ private final QueryLifecycle queryLifecycle;
+ private final ResourceIOReaderWriter io;
+
+ public QueryResourceQueryResultPusher(
+ HttpServletRequest req,
+ QueryLifecycle queryLifecycle,
+ ResourceIOReaderWriter io,
+ HttpServletResponse response
+ )
+ {
+ super(
+ response,
+ QueryResource.this.jsonMapper,
+ QueryResource.this.responseContextConfig,
+ QueryResource.this.selfNode,
+ QueryResource.this.counter,
+ queryLifecycle.getQueryId(),
+ MediaType.valueOf(io.getResponseWriter().getResponseType())
+ );
+ this.req = req;
+ this.queryLifecycle = queryLifecycle;
+ this.io = io;
+ }
+
+ @Override
+ public ResultsWriter start()
+ {
+ return new ResultsWriter()
+ {
+ @Override
+ public QueryResponse