diff --git a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java index 278969ebae2..cb44618a772 100644 --- a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java +++ b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java @@ -64,8 +64,14 @@ public class CliHttpClient { return new ErrorResponse((RequestType) request.requestType(), failure.reason(), failure.type(), failure.remoteTrace()); } + SqlExceptionType type = SqlExceptionType.fromRemoteFailureType(failure.type()); + if (type == null) { + return new ErrorResponse((RequestType) request.requestType(), + "Sent bad type [" + failure.type() + "]. Original message is [" + failure.reason() + "]", + failure.type(), failure.remoteTrace()); + } return new ExceptionResponse((RequestType) request.requestType(), failure.reason(), - failure.type(), SqlExceptionType.fromRemoteFailureType(failure.type())); + failure.type(), type); } ) ) diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/HttpClient.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/HttpClient.java index d4ccc3ff95e..d4fbb417952 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/HttpClient.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/HttpClient.java @@ -77,8 +77,14 @@ class HttpClient { return new ErrorResponse((RequestType) request.requestType(), failure.reason(), failure.type(), failure.remoteTrace()); } + SqlExceptionType type = SqlExceptionType.fromRemoteFailureType(failure.type()); + if (type == null) { + return new ErrorResponse((RequestType) request.requestType(), + "Sent bad type [" + failure.type() + "]. Original message is [" + failure.reason() + "]", + failure.type(), failure.remoteTrace()); + } return new ExceptionResponse((RequestType) request.requestType(), failure.reason(), - failure.type(), SqlExceptionType.fromRemoteFailureType(failure.type())); + failure.type(), type); } ) ) diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java index b460e89feee..2309a19276b 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java @@ -115,7 +115,7 @@ public class JdbcHttpClient { if (response.responseType() == ResponseType.ERROR) { ErrorResponse error = (ErrorResponse) response; // TODO: this could be made configurable to switch between message to error - throw new JdbcSQLException("Server returned error: [" + error.stack + "]"); + throw new JdbcSQLException("Server returned error: [" + error.message + "][" + error.stack + "]"); } return response; } @@ -131,4 +131,4 @@ public class JdbcHttpClient { } return new TimeoutInfo(clientTime, timeout, conCfg.pageTimeout()); } -} \ No newline at end of file +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/AbstractSqlProtocolRestAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/AbstractSqlProtocolRestAction.java index 1774c139f51..85f7f04a14a 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/AbstractSqlProtocolRestAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/AbstractSqlProtocolRestAction.java @@ -17,7 +17,9 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestRequest.Method; +import org.elasticsearch.rest.action.RestResponseListener; import org.elasticsearch.xpack.sql.ClientSqlException; import org.elasticsearch.xpack.sql.analysis.AnalysisException; import org.elasticsearch.xpack.sql.analysis.catalog.MappingException; @@ -55,29 +57,16 @@ public abstract class AbstractSqlProtocolRestAction extends BaseRestHandler { protected abstract RestChannelConsumer innerPrepareRequest(Request request, Client client) throws IOException; - protected abstract AbstractExceptionResponse buildExceptionResponse(Request request, String message, String cause, - SqlExceptionType exceptionType); - - protected abstract AbstractErrorResponse buildErrorResponse(Request request, String message, String cause, String stack); - - protected ActionListener toActionListener(Request request, RestChannel channel, Function responseBuilder) { - return new ActionListener() { + protected ActionListener toActionListener(RestChannel channel, Function responseBuilder) { + return new RestResponseListener(channel) { @Override - public void onResponse(T response) { - try { - sendResponse(channel, responseBuilder.apply(response)); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - sendResponse(channel, exceptionResponse(request, e)); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.error("failed to send failure response", inner); + public RestResponse buildResponse(T response) throws Exception { + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) { + // NOCOMMIT use the version from the client + proto.writeResponse(responseBuilder.apply(response), Proto.CURRENT_VERSION, dataOutputStream); + } + return new BytesRestResponse(OK, TEXT_CONTENT_TYPE, bytesStreamOutput.bytes()); } } }; @@ -92,63 +81,6 @@ public abstract class AbstractSqlProtocolRestAction extends BaseRestHandler { try (DataInputStream in = new DataInputStream(restRequest.content().streamInput())) { request = proto.readRequest(in); } - try { - return innerPrepareRequest(request, client); - } catch (Exception e) { - return channel -> sendResponse(channel, exceptionResponse(request, e)); - } - } - - private Response exceptionResponse(Request request, Exception e) { - // TODO I wonder why we don't just teach the servers to handle ES's normal exception response..... - SqlExceptionType exceptionType = sqlExceptionType(e); - - String message = EMPTY; - String cs = EMPTY; - if (Strings.hasText(e.getMessage())) { - message = e.getMessage(); - } - cs = e.getClass().getName(); - - if (exceptionType != null) { - return buildExceptionResponse(request, message, cs, exceptionType); - } else { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - // TODO: the stack shouldn't be necessary unless for advance diagnostics - return buildErrorResponse(request, message, cs, sw.toString()); - } - } - - private static SqlExceptionType sqlExceptionType(Throwable cause) { - if (cause instanceof ClientSqlException) { - if (cause instanceof AnalysisException || cause instanceof ResourceNotFoundException) { - return SqlExceptionType.DATA; - } - if (cause instanceof PlanningException || cause instanceof MappingException) { - return SqlExceptionType.NOT_SUPPORTED; - } - if (cause instanceof ParsingException) { - return SqlExceptionType.SYNTAX; - } - if (cause instanceof PlanningException) { - return SqlExceptionType.NOT_SUPPORTED; - } - } - if (cause instanceof TimeoutException) { - return SqlExceptionType.TIMEOUT; - } - - return null; - } - - private void sendResponse(RestChannel channel, Response response) throws IOException { - try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { - try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) { - // NOCOMMIT use the version from the client - proto.writeResponse(response, Proto.CURRENT_VERSION, dataOutputStream); - } - channel.sendResponse(new BytesRestResponse(OK, TEXT_CONTENT_TYPE, bytesStreamOutput.bytes())); - } + return innerPrepareRequest(request, client); } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java index c2d078c424e..aa602f76334 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java @@ -48,7 +48,7 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction { @Override public String getName() { return "xpack_sql_cli_action"; - } + } @Override protected RestChannelConsumer innerPrepareRequest(Request request, Client client) throws IOException { @@ -56,16 +56,6 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction { return consumer::accept; } - @Override - protected ErrorResponse buildErrorResponse(Request request, String message, String cause, String stack) { - return new ErrorResponse((RequestType) request.requestType(), message, cause, stack); - } - - @Override - protected ExceptionResponse buildExceptionResponse(Request request, String message, String cause, SqlExceptionType exceptionType) { - return new ExceptionResponse((RequestType) request.requestType(), message, cause, exceptionType); - } - /** * Actual implementation of the operation */ @@ -74,7 +64,7 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction { RequestType requestType = (RequestType) request.requestType(); switch (requestType) { case INFO: - return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), toActionListener(request, channel, response -> + return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), toActionListener(channel, response -> new InfoResponse(response.getNodeName(), response.getClusterName().value(), response.getVersion().major, response.getVersion().minor, response.getVersion().toString(), response.getBuild().shortHash(), response.getBuild().date()))); @@ -94,7 +84,7 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction { TimeValue.timeValueMillis(request.timeout.pageTimeout), Cursor.EMPTY); long start = System.nanoTime(); - return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> { + return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> { CliFormatter formatter = new CliFormatter(response); String data = formatter.formatWithHeader(response); return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data); @@ -116,7 +106,7 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction { cursor); long start = System.nanoTime(); - return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> { + return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> { String data = formatter.formatWithoutHeader(response); return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data); })); @@ -134,4 +124,4 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction { throw new RuntimeException("unexpected trouble building the cursor", e); } } - } +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java index fc45dae7472..89f058bb6ad 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java @@ -79,16 +79,6 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { return consumer::accept; } - @Override - protected ErrorResponse buildErrorResponse(Request request, String message, String cause, String stack) { - return new ErrorResponse((RequestType) request.requestType(), message, cause, stack); - } - - @Override - protected ExceptionResponse buildExceptionResponse(Request request, String message, String cause, SqlExceptionType exceptionType) { - return new ExceptionResponse((RequestType) request.requestType(), message, cause, exceptionType); - } - /** * Actual implementation of the operation */ @@ -97,7 +87,7 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { RequestType requestType = (RequestType) request.requestType(); switch (requestType) { case INFO: - return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), toActionListener(request, channel, response -> + return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), toActionListener(channel, response -> new InfoResponse(response.getNodeName(), response.getClusterName().value(), response.getVersion().major, response.getVersion().minor, response.getVersion().toString(), response.getBuild().shortHash(), response.getBuild().date()))); @@ -118,7 +108,7 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { String indexPattern = hasText(request.pattern()) ? StringUtils.jdbcToEsPattern(request.pattern()) : "*"; SqlGetIndicesAction.Request getRequest = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), indexPattern); getRequest.local(true); // TODO serialization not supported by get indices action - return channel -> client.execute(SqlGetIndicesAction.INSTANCE, getRequest, toActionListener(request, channel, response -> { + return channel -> client.execute(SqlGetIndicesAction.INSTANCE, getRequest, toActionListener(channel, response -> { return new MetaTableResponse(response.indices().stream() .map(EsIndex::name) .collect(toList())); @@ -131,7 +121,7 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { SqlGetIndicesAction.Request getRequest = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), indexPattern); getRequest.local(true); // TODO serialization not supported by get indices action - return channel -> client.execute(SqlGetIndicesAction.INSTANCE, getRequest, toActionListener(request, channel, response -> { + return channel -> client.execute(SqlGetIndicesAction.INSTANCE, getRequest, toActionListener(channel, response -> { List columns = new ArrayList<>(); for (EsIndex esIndex : response.indices()) { int pos = 0; @@ -140,7 +130,7 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { String name = entry.getKey(); if (columnMatcher == null || columnMatcher.matcher(name).matches()) { DataType type = entry.getValue(); - // the column size it's actually its precision (based on the Javadocs) + // the column size it's actually its precision (based on the Javadocs) columns.add(new MetaColumnInfo(esIndex.name(), name, type.sqlType(), type.precision(), pos)); } } @@ -150,12 +140,12 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { } private Consumer queryInit(Client client, QueryInitRequest request) { - SqlRequest sqlRequest = new SqlRequest(request.query, DateTimeZone.forTimeZone(request.timeZone), request.fetchSize, + SqlRequest sqlRequest = new SqlRequest(request.query, DateTimeZone.forTimeZone(request.timeZone), request.fetchSize, TimeValue.timeValueMillis(request.timeout.requestTimeout), - TimeValue.timeValueMillis(request.timeout.pageTimeout), + TimeValue.timeValueMillis(request.timeout.pageTimeout), Cursor.EMPTY); long start = System.nanoTime(); - return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> { + return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> { List types = new ArrayList<>(response.columns().size()); List columns = new ArrayList<>(response.columns().size()); for (SqlResponse.ColumnInfo info : response.columns()) { @@ -177,12 +167,12 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { throw new IllegalArgumentException("error reading the cursor"); } // NB: the timezone and page size are locked already by the query so pass in defaults (as they are not read anyway) - SqlRequest sqlRequest = new SqlRequest(EMPTY, SqlRequest.DEFAULT_TIME_ZONE, 0, + SqlRequest sqlRequest = new SqlRequest(EMPTY, SqlRequest.DEFAULT_TIME_ZONE, 0, TimeValue.timeValueMillis(request.timeout.requestTimeout), - TimeValue.timeValueMillis(request.timeout.pageTimeout), + TimeValue.timeValueMillis(request.timeout.pageTimeout), cursor); long start = System.nanoTime(); - return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> { + return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> { return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types), new SqlResponsePayload(types, response.rows())); })); @@ -203,4 +193,4 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { throw new RuntimeException("unexpected trouble building the cursor", e); } } -} \ No newline at end of file +} diff --git a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractProto.java b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractProto.java index f24fdad9115..235d2bd4081 100644 --- a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractProto.java +++ b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractProto.java @@ -89,15 +89,16 @@ public abstract class AbstractProto { public static SqlExceptionType fromRemoteFailureType(String type) { switch (type) { - case "analysis": - case "resouce_not_found": + case "analysis_exception": + case "resource_not_found_exception": + case "verification_exception": return DATA; - case "planning": - case "mapping": + case "planning_exception": + case "mapping_exception": return NOT_SUPPORTED; - case "parsing": + case "parsing_exception": return SYNTAX; - case "timeout": + case "timeout_exception": return TIMEOUT; default: return null;