mirror of https://github.com/apache/druid.git
Fix compatibility issue with SqlTaskResource (#14466)
* Fix compatibility issue with SqlTaskResource The DruidException changes broke the response format for errors coming back from the SqlTaskResource, so fix those
This commit is contained in:
parent
9b1779734b
commit
7e2cf35d7b
|
@ -23,22 +23,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.CountingOutputStream;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.common.exception.SanitizableException;
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.error.ErrorResponse;
|
||||
import org.apache.druid.error.QueryExceptionCompat;
|
||||
import org.apache.druid.guice.annotations.MSQ;
|
||||
import org.apache.druid.indexer.TaskState;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Yielder;
|
||||
import org.apache.druid.java.util.common.guava.Yielders;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.query.BadQueryException;
|
||||
import org.apache.druid.query.QueryCapacityExceededException;
|
||||
import org.apache.druid.query.QueryException;
|
||||
import org.apache.druid.query.QueryInterruptedException;
|
||||
import org.apache.druid.query.QueryTimeoutException;
|
||||
import org.apache.druid.query.QueryUnsupportedException;
|
||||
import org.apache.druid.query.ResourceLimitExceededException;
|
||||
import org.apache.druid.server.QueryResponse;
|
||||
import org.apache.druid.server.initialization.ServerConfig;
|
||||
import org.apache.druid.server.security.Access;
|
||||
|
@ -47,7 +41,6 @@ import org.apache.druid.server.security.AuthorizerMapper;
|
|||
import org.apache.druid.server.security.ForbiddenException;
|
||||
import org.apache.druid.sql.DirectStatement;
|
||||
import org.apache.druid.sql.HttpStatement;
|
||||
import org.apache.druid.sql.SqlPlanningException;
|
||||
import org.apache.druid.sql.SqlRowTransformer;
|
||||
import org.apache.druid.sql.SqlStatementFactory;
|
||||
import org.apache.druid.sql.http.ResultFormat;
|
||||
|
@ -63,7 +56,6 @@ import javax.ws.rs.Produces;
|
|||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import javax.ws.rs.core.StreamingOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
@ -162,43 +154,32 @@ public class SqlTaskResource
|
|||
}
|
||||
catch (DruidException e) {
|
||||
stmt.reporter().failed(e);
|
||||
return Response.status(e.getStatusCode())
|
||||
.type(MediaType.APPLICATION_JSON_TYPE)
|
||||
.entity(new ErrorResponse(e))
|
||||
.build();
|
||||
return buildNonOkResponse(sqlQueryId, e);
|
||||
}
|
||||
// Kitchen-sinking the errors since they are all unchecked.
|
||||
// Just copied from SqlResource.
|
||||
catch (QueryCapacityExceededException cap) {
|
||||
stmt.reporter().failed(cap);
|
||||
return buildNonOkResponse(QueryCapacityExceededException.STATUS_CODE, cap, sqlQueryId);
|
||||
}
|
||||
catch (QueryUnsupportedException unsupported) {
|
||||
stmt.reporter().failed(unsupported);
|
||||
return buildNonOkResponse(QueryUnsupportedException.STATUS_CODE, unsupported, sqlQueryId);
|
||||
}
|
||||
catch (QueryTimeoutException timeout) {
|
||||
stmt.reporter().failed(timeout);
|
||||
return buildNonOkResponse(QueryTimeoutException.STATUS_CODE, timeout, sqlQueryId);
|
||||
}
|
||||
catch (SqlPlanningException | ResourceLimitExceededException e) {
|
||||
stmt.reporter().failed(e);
|
||||
return buildNonOkResponse(BadQueryException.STATUS_CODE, e, sqlQueryId);
|
||||
catch (QueryException queryException) {
|
||||
stmt.reporter().failed(queryException);
|
||||
final DruidException underlyingException = DruidException.fromFailure(new QueryExceptionCompat(queryException));
|
||||
return buildNonOkResponse(sqlQueryId, underlyingException);
|
||||
}
|
||||
catch (ForbiddenException e) {
|
||||
// No request logs for forbidden queries; same as SqlResource
|
||||
throw (ForbiddenException) serverConfig.getErrorResponseTransformStrategy()
|
||||
.transformIfNeeded(e); // let ForbiddenExceptionMapper handle this
|
||||
log.debug("Got forbidden request for reason [%s]", e.getErrorMessage());
|
||||
return buildNonOkResponse(
|
||||
"forbidden",
|
||||
DruidException.forPersona(DruidException.Persona.USER)
|
||||
.ofCategory(DruidException.Category.FORBIDDEN)
|
||||
.build(Access.DEFAULT_ERROR_MESSAGE)
|
||||
);
|
||||
}
|
||||
// Calcite throws a java.lang.AssertionError which is type Error not Exception. Using Throwable catches both.
|
||||
catch (Throwable e) {
|
||||
// Calcite throws java.lang.AssertionError at various points in planning/validation.
|
||||
catch (AssertionError | Exception e) {
|
||||
stmt.reporter().failed(e);
|
||||
log.noStackTrace().warn(e, "Failed to handle query: %s", sqlQueryId);
|
||||
|
||||
return buildNonOkResponse(
|
||||
Status.INTERNAL_SERVER_ERROR.getStatusCode(),
|
||||
QueryInterruptedException.wrapIfNeeded(e),
|
||||
sqlQueryId
|
||||
sqlQueryId,
|
||||
DruidException.forPersona(DruidException.Persona.DEVELOPER)
|
||||
.ofCategory(DruidException.Category.UNCATEGORIZED)
|
||||
.build(e.getMessage())
|
||||
);
|
||||
}
|
||||
finally {
|
||||
|
@ -237,12 +218,16 @@ public class SqlTaskResource
|
|||
yielder.close();
|
||||
|
||||
if (taskId == null) {
|
||||
// Note: no ID to include in error: that is the problem we're reporting.
|
||||
return genericError(
|
||||
Response.Status.INTERNAL_SERVER_ERROR,
|
||||
"Internal error",
|
||||
"Failed to issue query task",
|
||||
null
|
||||
// Note: no ID to include in error: that is the problem we're reporting. It would be really nice to know
|
||||
// why we don't have an ID or more information about why things failed. Hopefully that gets returned to the
|
||||
// user through a DruidExcpetion that makes it out of this code and this code never actually gets executed.
|
||||
// Using a defensive exception just to report something with the opes that any time this actually happens, the
|
||||
// fix is to make error reporting somewhere that actually understands more about why it failed.
|
||||
return buildNonOkResponse(
|
||||
null,
|
||||
DruidException.forPersona(DruidException.Persona.DEVELOPER)
|
||||
.ofCategory(DruidException.Category.DEFENSIVE)
|
||||
.build("Failed to issue query task")
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -252,6 +237,7 @@ public class SqlTaskResource
|
|||
.build();
|
||||
}
|
||||
|
||||
@SuppressWarnings("UnstableApiUsage")
|
||||
private Response buildStandardResponse(
|
||||
Sequence<Object[]> sequence,
|
||||
SqlQuery sqlQuery,
|
||||
|
@ -316,22 +302,11 @@ public class SqlTaskResource
|
|||
}
|
||||
}
|
||||
|
||||
private Response buildNonOkResponse(int status, SanitizableException e, String sqlQueryId)
|
||||
{
|
||||
QueryException cleaned = (QueryException) serverConfig
|
||||
.getErrorResponseTransformStrategy()
|
||||
.transformIfNeeded(e);
|
||||
return Response
|
||||
.status(status)
|
||||
.entity(new SqlTaskStatus(sqlQueryId, TaskState.FAILED, cleaned))
|
||||
.build();
|
||||
}
|
||||
|
||||
private Response genericError(Response.Status status, String code, String msg, String id)
|
||||
private Response buildNonOkResponse(String sqlQueryId, DruidException exception)
|
||||
{
|
||||
return Response
|
||||
.status(status)
|
||||
.entity(new SqlTaskStatus(id, TaskState.FAILED, new QueryException("FAILED", msg, null, null)))
|
||||
.status(exception.getStatusCode())
|
||||
.entity(new SqlTaskStatus(sqlQueryId, TaskState.FAILED, new ErrorResponse(exception)))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.error.ErrorResponse;
|
||||
import org.apache.druid.indexer.TaskState;
|
||||
import org.apache.druid.query.QueryException;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Objects;
|
||||
|
@ -37,13 +37,13 @@ public class SqlTaskStatus
|
|||
private final String taskId;
|
||||
private final TaskState state;
|
||||
@Nullable
|
||||
private final QueryException error;
|
||||
private final ErrorResponse error;
|
||||
|
||||
@JsonCreator
|
||||
public SqlTaskStatus(
|
||||
@JsonProperty("taskId") final String taskId,
|
||||
@JsonProperty("state") final TaskState state,
|
||||
@JsonProperty("error") @Nullable final QueryException error
|
||||
@JsonProperty("error") @Nullable final ErrorResponse error
|
||||
)
|
||||
{
|
||||
this.taskId = Preconditions.checkNotNull(taskId, "taskId");
|
||||
|
@ -66,7 +66,7 @@ public class SqlTaskStatus
|
|||
@Nullable
|
||||
@JsonProperty
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public QueryException getError()
|
||||
public ErrorResponse getError()
|
||||
{
|
||||
return error;
|
||||
}
|
||||
|
@ -98,7 +98,7 @@ public class SqlTaskStatus
|
|||
return "SqlTaskStatus{" +
|
||||
"taskId='" + taskId + '\'' +
|
||||
", state=" + state +
|
||||
", error=" + error +
|
||||
", error=" + (error == null ? "null" : error.getAsMap()) +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,8 +21,9 @@ package org.apache.druid.msq.sql;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import nl.jqno.equalsverifier.EqualsVerifier;
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.error.ErrorResponse;
|
||||
import org.apache.druid.indexer.TaskState;
|
||||
import org.apache.druid.query.QueryException;
|
||||
import org.apache.druid.segment.TestHelper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -34,22 +35,23 @@ public class SqlTaskStatusTest
|
|||
{
|
||||
final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||
|
||||
DruidException underlyingException = DruidException.forPersona(DruidException.Persona.DEVELOPER)
|
||||
.ofCategory(DruidException.Category.INVALID_INPUT)
|
||||
.build("error message");
|
||||
|
||||
final SqlTaskStatus status = new SqlTaskStatus(
|
||||
"taskid",
|
||||
TaskState.FAILED,
|
||||
new QueryException(
|
||||
"error code",
|
||||
"error message",
|
||||
"error class",
|
||||
"host"
|
||||
)
|
||||
new ErrorResponse(underlyingException)
|
||||
);
|
||||
|
||||
final SqlTaskStatus status2 = mapper.readValue(mapper.writeValueAsString(status), SqlTaskStatus.class);
|
||||
|
||||
Assert.assertEquals(status.getTaskId(), status2.getTaskId());
|
||||
Assert.assertEquals(status.getState(), status2.getState());
|
||||
Assert.assertEquals(status.getError().getErrorCode(), status2.getError().getErrorCode());
|
||||
Assert.assertNotNull(status.getError());
|
||||
Assert.assertNotNull(status2.getError());
|
||||
Assert.assertEquals(status.getError().getAsMap(), status2.getError().getAsMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -315,6 +315,10 @@ public class DruidException extends RuntimeException
|
|||
* Means that the error is a problem with authorization.
|
||||
*/
|
||||
UNAUTHORIZED(401),
|
||||
/**
|
||||
* Means that an action that was attempted is forbidden
|
||||
*/
|
||||
FORBIDDEN(403),
|
||||
/**
|
||||
* Means that some capacity limit was exceeded, this could be due to throttling or due to some system limit
|
||||
*/
|
||||
|
|
|
@ -26,8 +26,6 @@ package org.apache.druid.query;
|
|||
*/
|
||||
public abstract class BadQueryException extends QueryException
|
||||
{
|
||||
public static final int STATUS_CODE = 400;
|
||||
|
||||
protected BadQueryException(String errorCode, String errorMessage, String errorClass)
|
||||
{
|
||||
this(errorCode, errorMessage, errorClass, null);
|
||||
|
|
Loading…
Reference in New Issue