SQL: Stop serializing errors for CLI and JDBC (elastic/x-pack-elasticsearch#3034)
Now that we can parse Elasticsearch's standard error messages in the CLI and JDBC client we can just let those standard error messages bubble out of Elasticsearch rather than catch and encode them. In a followup we can remove the encoding entirely. Original commit: elastic/x-pack-elasticsearch@bad043b6f7
This commit is contained in:
parent
b8e082107f
commit
fce5b494be
|
@ -64,8 +64,14 @@ public class CliHttpClient {
|
||||||
return new ErrorResponse((RequestType) request.requestType(), failure.reason(),
|
return new ErrorResponse((RequestType) request.requestType(), failure.reason(),
|
||||||
failure.type(), failure.remoteTrace());
|
failure.type(), failure.remoteTrace());
|
||||||
}
|
}
|
||||||
|
SqlExceptionType type = SqlExceptionType.fromRemoteFailureType(failure.type());
|
||||||
|
if (type == null) {
|
||||||
|
return new ErrorResponse((RequestType) request.requestType(),
|
||||||
|
"Sent bad type [" + failure.type() + "]. Original message is [" + failure.reason() + "]",
|
||||||
|
failure.type(), failure.remoteTrace());
|
||||||
|
}
|
||||||
return new ExceptionResponse((RequestType) request.requestType(), failure.reason(),
|
return new ExceptionResponse((RequestType) request.requestType(), failure.reason(),
|
||||||
failure.type(), SqlExceptionType.fromRemoteFailureType(failure.type()));
|
failure.type(), type);
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
|
@ -77,8 +77,14 @@ class HttpClient {
|
||||||
return new ErrorResponse((RequestType) request.requestType(), failure.reason(),
|
return new ErrorResponse((RequestType) request.requestType(), failure.reason(),
|
||||||
failure.type(), failure.remoteTrace());
|
failure.type(), failure.remoteTrace());
|
||||||
}
|
}
|
||||||
|
SqlExceptionType type = SqlExceptionType.fromRemoteFailureType(failure.type());
|
||||||
|
if (type == null) {
|
||||||
|
return new ErrorResponse((RequestType) request.requestType(),
|
||||||
|
"Sent bad type [" + failure.type() + "]. Original message is [" + failure.reason() + "]",
|
||||||
|
failure.type(), failure.remoteTrace());
|
||||||
|
}
|
||||||
return new ExceptionResponse((RequestType) request.requestType(), failure.reason(),
|
return new ExceptionResponse((RequestType) request.requestType(), failure.reason(),
|
||||||
failure.type(), SqlExceptionType.fromRemoteFailureType(failure.type()));
|
failure.type(), type);
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
|
@ -115,7 +115,7 @@ public class JdbcHttpClient {
|
||||||
if (response.responseType() == ResponseType.ERROR) {
|
if (response.responseType() == ResponseType.ERROR) {
|
||||||
ErrorResponse error = (ErrorResponse) response;
|
ErrorResponse error = (ErrorResponse) response;
|
||||||
// TODO: this could be made configurable to switch between message to error
|
// TODO: this could be made configurable to switch between message to error
|
||||||
throw new JdbcSQLException("Server returned error: [" + error.stack + "]");
|
throw new JdbcSQLException("Server returned error: [" + error.message + "][" + error.stack + "]");
|
||||||
}
|
}
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,9 @@ import org.elasticsearch.rest.BaseRestHandler;
|
||||||
import org.elasticsearch.rest.BytesRestResponse;
|
import org.elasticsearch.rest.BytesRestResponse;
|
||||||
import org.elasticsearch.rest.RestChannel;
|
import org.elasticsearch.rest.RestChannel;
|
||||||
import org.elasticsearch.rest.RestRequest;
|
import org.elasticsearch.rest.RestRequest;
|
||||||
|
import org.elasticsearch.rest.RestResponse;
|
||||||
import org.elasticsearch.rest.RestRequest.Method;
|
import org.elasticsearch.rest.RestRequest.Method;
|
||||||
|
import org.elasticsearch.rest.action.RestResponseListener;
|
||||||
import org.elasticsearch.xpack.sql.ClientSqlException;
|
import org.elasticsearch.xpack.sql.ClientSqlException;
|
||||||
import org.elasticsearch.xpack.sql.analysis.AnalysisException;
|
import org.elasticsearch.xpack.sql.analysis.AnalysisException;
|
||||||
import org.elasticsearch.xpack.sql.analysis.catalog.MappingException;
|
import org.elasticsearch.xpack.sql.analysis.catalog.MappingException;
|
||||||
|
@ -55,29 +57,16 @@ public abstract class AbstractSqlProtocolRestAction extends BaseRestHandler {
|
||||||
|
|
||||||
protected abstract RestChannelConsumer innerPrepareRequest(Request request, Client client) throws IOException;
|
protected abstract RestChannelConsumer innerPrepareRequest(Request request, Client client) throws IOException;
|
||||||
|
|
||||||
protected abstract AbstractExceptionResponse buildExceptionResponse(Request request, String message, String cause,
|
protected <T> ActionListener<T> toActionListener(RestChannel channel, Function<T, Response> responseBuilder) {
|
||||||
SqlExceptionType exceptionType);
|
return new RestResponseListener<T>(channel) {
|
||||||
|
|
||||||
protected abstract AbstractErrorResponse buildErrorResponse(Request request, String message, String cause, String stack);
|
|
||||||
|
|
||||||
protected <T> ActionListener<T> toActionListener(Request request, RestChannel channel, Function<T, Response> responseBuilder) {
|
|
||||||
return new ActionListener<T>() {
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(T response) {
|
public RestResponse buildResponse(T response) throws Exception {
|
||||||
try {
|
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
|
||||||
sendResponse(channel, responseBuilder.apply(response));
|
try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) {
|
||||||
} catch (Exception e) {
|
// NOCOMMIT use the version from the client
|
||||||
onFailure(e);
|
proto.writeResponse(responseBuilder.apply(response), Proto.CURRENT_VERSION, dataOutputStream);
|
||||||
}
|
}
|
||||||
}
|
return new BytesRestResponse(OK, TEXT_CONTENT_TYPE, bytesStreamOutput.bytes());
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Exception e) {
|
|
||||||
try {
|
|
||||||
sendResponse(channel, exceptionResponse(request, e));
|
|
||||||
} catch (Exception inner) {
|
|
||||||
inner.addSuppressed(e);
|
|
||||||
logger.error("failed to send failure response", inner);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -92,63 +81,6 @@ public abstract class AbstractSqlProtocolRestAction extends BaseRestHandler {
|
||||||
try (DataInputStream in = new DataInputStream(restRequest.content().streamInput())) {
|
try (DataInputStream in = new DataInputStream(restRequest.content().streamInput())) {
|
||||||
request = proto.readRequest(in);
|
request = proto.readRequest(in);
|
||||||
}
|
}
|
||||||
try {
|
|
||||||
return innerPrepareRequest(request, client);
|
return innerPrepareRequest(request, client);
|
||||||
} catch (Exception e) {
|
|
||||||
return channel -> sendResponse(channel, exceptionResponse(request, e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Response exceptionResponse(Request request, Exception e) {
|
|
||||||
// TODO I wonder why we don't just teach the servers to handle ES's normal exception response.....
|
|
||||||
SqlExceptionType exceptionType = sqlExceptionType(e);
|
|
||||||
|
|
||||||
String message = EMPTY;
|
|
||||||
String cs = EMPTY;
|
|
||||||
if (Strings.hasText(e.getMessage())) {
|
|
||||||
message = e.getMessage();
|
|
||||||
}
|
|
||||||
cs = e.getClass().getName();
|
|
||||||
|
|
||||||
if (exceptionType != null) {
|
|
||||||
return buildExceptionResponse(request, message, cs, exceptionType);
|
|
||||||
} else {
|
|
||||||
StringWriter sw = new StringWriter();
|
|
||||||
e.printStackTrace(new PrintWriter(sw));
|
|
||||||
// TODO: the stack shouldn't be necessary unless for advance diagnostics
|
|
||||||
return buildErrorResponse(request, message, cs, sw.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static SqlExceptionType sqlExceptionType(Throwable cause) {
|
|
||||||
if (cause instanceof ClientSqlException) {
|
|
||||||
if (cause instanceof AnalysisException || cause instanceof ResourceNotFoundException) {
|
|
||||||
return SqlExceptionType.DATA;
|
|
||||||
}
|
|
||||||
if (cause instanceof PlanningException || cause instanceof MappingException) {
|
|
||||||
return SqlExceptionType.NOT_SUPPORTED;
|
|
||||||
}
|
|
||||||
if (cause instanceof ParsingException) {
|
|
||||||
return SqlExceptionType.SYNTAX;
|
|
||||||
}
|
|
||||||
if (cause instanceof PlanningException) {
|
|
||||||
return SqlExceptionType.NOT_SUPPORTED;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (cause instanceof TimeoutException) {
|
|
||||||
return SqlExceptionType.TIMEOUT;
|
|
||||||
}
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendResponse(RestChannel channel, Response response) throws IOException {
|
|
||||||
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
|
|
||||||
try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) {
|
|
||||||
// NOCOMMIT use the version from the client
|
|
||||||
proto.writeResponse(response, Proto.CURRENT_VERSION, dataOutputStream);
|
|
||||||
}
|
|
||||||
channel.sendResponse(new BytesRestResponse(OK, TEXT_CONTENT_TYPE, bytesStreamOutput.bytes()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,16 +56,6 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction {
|
||||||
return consumer::accept;
|
return consumer::accept;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ErrorResponse buildErrorResponse(Request request, String message, String cause, String stack) {
|
|
||||||
return new ErrorResponse((RequestType) request.requestType(), message, cause, stack);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ExceptionResponse buildExceptionResponse(Request request, String message, String cause, SqlExceptionType exceptionType) {
|
|
||||||
return new ExceptionResponse((RequestType) request.requestType(), message, cause, exceptionType);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Actual implementation of the operation
|
* Actual implementation of the operation
|
||||||
*/
|
*/
|
||||||
|
@ -74,7 +64,7 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction {
|
||||||
RequestType requestType = (RequestType) request.requestType();
|
RequestType requestType = (RequestType) request.requestType();
|
||||||
switch (requestType) {
|
switch (requestType) {
|
||||||
case INFO:
|
case INFO:
|
||||||
return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), toActionListener(request, channel, response ->
|
return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), toActionListener(channel, response ->
|
||||||
new InfoResponse(response.getNodeName(), response.getClusterName().value(),
|
new InfoResponse(response.getNodeName(), response.getClusterName().value(),
|
||||||
response.getVersion().major, response.getVersion().minor, response.getVersion().toString(),
|
response.getVersion().major, response.getVersion().minor, response.getVersion().toString(),
|
||||||
response.getBuild().shortHash(), response.getBuild().date())));
|
response.getBuild().shortHash(), response.getBuild().date())));
|
||||||
|
@ -94,7 +84,7 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction {
|
||||||
TimeValue.timeValueMillis(request.timeout.pageTimeout),
|
TimeValue.timeValueMillis(request.timeout.pageTimeout),
|
||||||
Cursor.EMPTY);
|
Cursor.EMPTY);
|
||||||
long start = System.nanoTime();
|
long start = System.nanoTime();
|
||||||
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> {
|
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> {
|
||||||
CliFormatter formatter = new CliFormatter(response);
|
CliFormatter formatter = new CliFormatter(response);
|
||||||
String data = formatter.formatWithHeader(response);
|
String data = formatter.formatWithHeader(response);
|
||||||
return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data);
|
return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data);
|
||||||
|
@ -116,7 +106,7 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction {
|
||||||
cursor);
|
cursor);
|
||||||
|
|
||||||
long start = System.nanoTime();
|
long start = System.nanoTime();
|
||||||
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> {
|
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> {
|
||||||
String data = formatter.formatWithoutHeader(response);
|
String data = formatter.formatWithoutHeader(response);
|
||||||
return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data);
|
return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data);
|
||||||
}));
|
}));
|
||||||
|
|
|
@ -79,16 +79,6 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
|
||||||
return consumer::accept;
|
return consumer::accept;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ErrorResponse buildErrorResponse(Request request, String message, String cause, String stack) {
|
|
||||||
return new ErrorResponse((RequestType) request.requestType(), message, cause, stack);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ExceptionResponse buildExceptionResponse(Request request, String message, String cause, SqlExceptionType exceptionType) {
|
|
||||||
return new ExceptionResponse((RequestType) request.requestType(), message, cause, exceptionType);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Actual implementation of the operation
|
* Actual implementation of the operation
|
||||||
*/
|
*/
|
||||||
|
@ -97,7 +87,7 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
|
||||||
RequestType requestType = (RequestType) request.requestType();
|
RequestType requestType = (RequestType) request.requestType();
|
||||||
switch (requestType) {
|
switch (requestType) {
|
||||||
case INFO:
|
case INFO:
|
||||||
return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), toActionListener(request, channel, response ->
|
return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), toActionListener(channel, response ->
|
||||||
new InfoResponse(response.getNodeName(), response.getClusterName().value(),
|
new InfoResponse(response.getNodeName(), response.getClusterName().value(),
|
||||||
response.getVersion().major, response.getVersion().minor, response.getVersion().toString(),
|
response.getVersion().major, response.getVersion().minor, response.getVersion().toString(),
|
||||||
response.getBuild().shortHash(), response.getBuild().date())));
|
response.getBuild().shortHash(), response.getBuild().date())));
|
||||||
|
@ -118,7 +108,7 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
|
||||||
String indexPattern = hasText(request.pattern()) ? StringUtils.jdbcToEsPattern(request.pattern()) : "*";
|
String indexPattern = hasText(request.pattern()) ? StringUtils.jdbcToEsPattern(request.pattern()) : "*";
|
||||||
SqlGetIndicesAction.Request getRequest = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), indexPattern);
|
SqlGetIndicesAction.Request getRequest = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), indexPattern);
|
||||||
getRequest.local(true); // TODO serialization not supported by get indices action
|
getRequest.local(true); // TODO serialization not supported by get indices action
|
||||||
return channel -> client.execute(SqlGetIndicesAction.INSTANCE, getRequest, toActionListener(request, channel, response -> {
|
return channel -> client.execute(SqlGetIndicesAction.INSTANCE, getRequest, toActionListener(channel, response -> {
|
||||||
return new MetaTableResponse(response.indices().stream()
|
return new MetaTableResponse(response.indices().stream()
|
||||||
.map(EsIndex::name)
|
.map(EsIndex::name)
|
||||||
.collect(toList()));
|
.collect(toList()));
|
||||||
|
@ -131,7 +121,7 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
|
||||||
|
|
||||||
SqlGetIndicesAction.Request getRequest = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), indexPattern);
|
SqlGetIndicesAction.Request getRequest = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), indexPattern);
|
||||||
getRequest.local(true); // TODO serialization not supported by get indices action
|
getRequest.local(true); // TODO serialization not supported by get indices action
|
||||||
return channel -> client.execute(SqlGetIndicesAction.INSTANCE, getRequest, toActionListener(request, channel, response -> {
|
return channel -> client.execute(SqlGetIndicesAction.INSTANCE, getRequest, toActionListener(channel, response -> {
|
||||||
List<MetaColumnInfo> columns = new ArrayList<>();
|
List<MetaColumnInfo> columns = new ArrayList<>();
|
||||||
for (EsIndex esIndex : response.indices()) {
|
for (EsIndex esIndex : response.indices()) {
|
||||||
int pos = 0;
|
int pos = 0;
|
||||||
|
@ -155,7 +145,7 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
|
||||||
TimeValue.timeValueMillis(request.timeout.pageTimeout),
|
TimeValue.timeValueMillis(request.timeout.pageTimeout),
|
||||||
Cursor.EMPTY);
|
Cursor.EMPTY);
|
||||||
long start = System.nanoTime();
|
long start = System.nanoTime();
|
||||||
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> {
|
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> {
|
||||||
List<JDBCType> types = new ArrayList<>(response.columns().size());
|
List<JDBCType> types = new ArrayList<>(response.columns().size());
|
||||||
List<ColumnInfo> columns = new ArrayList<>(response.columns().size());
|
List<ColumnInfo> columns = new ArrayList<>(response.columns().size());
|
||||||
for (SqlResponse.ColumnInfo info : response.columns()) {
|
for (SqlResponse.ColumnInfo info : response.columns()) {
|
||||||
|
@ -182,7 +172,7 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
|
||||||
TimeValue.timeValueMillis(request.timeout.pageTimeout),
|
TimeValue.timeValueMillis(request.timeout.pageTimeout),
|
||||||
cursor);
|
cursor);
|
||||||
long start = System.nanoTime();
|
long start = System.nanoTime();
|
||||||
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> {
|
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> {
|
||||||
return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types),
|
return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types),
|
||||||
new SqlResponsePayload(types, response.rows()));
|
new SqlResponsePayload(types, response.rows()));
|
||||||
}));
|
}));
|
||||||
|
|
|
@ -89,15 +89,16 @@ public abstract class AbstractProto {
|
||||||
|
|
||||||
public static SqlExceptionType fromRemoteFailureType(String type) {
|
public static SqlExceptionType fromRemoteFailureType(String type) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case "analysis":
|
case "analysis_exception":
|
||||||
case "resouce_not_found":
|
case "resource_not_found_exception":
|
||||||
|
case "verification_exception":
|
||||||
return DATA;
|
return DATA;
|
||||||
case "planning":
|
case "planning_exception":
|
||||||
case "mapping":
|
case "mapping_exception":
|
||||||
return NOT_SUPPORTED;
|
return NOT_SUPPORTED;
|
||||||
case "parsing":
|
case "parsing_exception":
|
||||||
return SYNTAX;
|
return SYNTAX;
|
||||||
case "timeout":
|
case "timeout_exception":
|
||||||
return TIMEOUT;
|
return TIMEOUT;
|
||||||
default:
|
default:
|
||||||
return null;
|
return null;
|
||||||
|
|
Loading…
Reference in New Issue