From cc41a2daa0c94fb46136bd40efb71d7cfd9aae16 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 7 Aug 2017 16:15:11 -0400 Subject: [PATCH] SQL: make CliAction transport friendly (elastic/x-pack-elasticsearch#2198) Adds proper serialization to CliAction's requests and responses Original commit: elastic/x-pack-elasticsearch@2d2a15a5ba9be2ed68c69da6bc9a7ba7aacea176 --- .../xpack/sql/AbstractSqlIntegTestCase.java | 60 ++++++++++++++ .../elasticsearch/xpack/sql/CliActionIT.java | 35 ++++++++ .../xpack/sql/CliRequestTests.java | 29 +++++++ .../xpack/sql/CliResponseTests.java | 31 +++++++ .../elasticsearch/xpack/sql/SqlActionIT.java | 53 +----------- .../sql/cli/net/protocol/InfoRequest.java | 2 +- .../sql/plugin/cli/action/CliHttpHandler.java | 31 ++----- .../sql/plugin/cli/action/CliRequest.java | 72 ++++++++++++++--- .../plugin/cli/action/CliRequestBuilder.java | 8 +- .../sql/plugin/cli/action/CliResponse.java | 80 +++++++++++++++++-- .../plugin/cli/action/TransportCliAction.java | 23 ++++-- 11 files changed, 318 insertions(+), 106 deletions(-) create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/sql/AbstractSqlIntegTestCase.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/sql/CliActionIT.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/sql/CliRequestTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/sql/CliResponseTests.java diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/AbstractSqlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/AbstractSqlIntegTestCase.java new file mode 100644 index 00000000000..781108fad19 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/AbstractSqlIntegTestCase.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.discovery.TestZenDiscovery; +import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.XPackSettings; +import org.elasticsearch.xpack.ml.MachineLearning; + +import java.util.Arrays; +import java.util.Collection; + +public abstract class AbstractSqlIntegTestCase extends ESIntegTestCase { + + @Override + protected boolean ignoreExternalCluster() { + return true; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal)); + settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); + settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); + settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); + settings.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false); + return settings.build(); + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(XPackPlugin.class, ReindexPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + @Override + protected Settings transportClientSettings() { + // Plugin should be loaded on the transport client as well + return nodeSettings(0); + } + + @Override + protected Collection> getMockPlugins() { + return Arrays.asList(TestZenDiscovery.TestPlugin.class, TestSeedPlugin.class); + } +} + diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/CliActionIT.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/CliActionIT.java new file mode 100644 index 00000000000..2b9ceb4f7e3 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/CliActionIT.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest; +import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction; +import org.elasticsearch.xpack.sql.plugin.cli.action.CliResponse; +import org.elasticsearch.xpack.sql.protocol.shared.Request; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; + +public class CliActionIT extends AbstractSqlIntegTestCase { + + public void testCliAction() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test").get()); + client().prepareBulk() + .add(new IndexRequest("test", "doc", "1").source("data", "bar", "count", 42)) + .add(new IndexRequest("test", "doc", "2").source("data", "baz", "count", 43)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + ensureYellow("test"); + + Request request = new CommandRequest("SELECT * FROM test ORDER BY count"); + + CliResponse response = client().prepareExecute(CliAction.INSTANCE).request(request).get(); + assertThat(response.response(request).toString(), containsString("bar")); + assertThat(response.response(request).toString(), containsString("baz")); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/CliRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/CliRequestTests.java new file mode 100644 index 00000000000..ffb82ae7fa2 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/CliRequestTests.java @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql; + +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest; +import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest; +import org.elasticsearch.xpack.sql.plugin.cli.action.CliRequest; + +public class CliRequestTests extends AbstractStreamableTestCase { + + @Override + protected CliRequest createTestInstance() { + if (randomBoolean()) { + return new CliRequest(new InfoRequest(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10), + randomAlphaOfLength(10), randomAlphaOfLength(10))); + } else { + return new CliRequest(new CommandRequest(randomAlphaOfLength(10))); + } + } + + @Override + protected CliRequest createBlankInstance() { + return new CliRequest(); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/CliResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/CliResponseTests.java new file mode 100644 index 00000000000..7f26fe685fa --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/CliResponseTests.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql; + +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoResponse; +import org.elasticsearch.xpack.sql.plugin.cli.action.CliResponse; + +public class CliResponseTests extends AbstractStreamableTestCase { + + @Override + protected CliResponse createTestInstance() { + if (randomBoolean()) { + return new CliResponse(new InfoResponse(randomAlphaOfLength(10), randomAlphaOfLength(10), + randomByte(), randomByte(), + randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10))); + } else { + return new CliResponse(new CommandResponse(randomNonNegativeLong(), randomNonNegativeLong(), + randomAlphaOfLength(10), randomAlphaOfLength(10))); + } + } + + @Override + protected CliResponse createBlankInstance() { + return new CliResponse(); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlActionIT.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlActionIT.java index 74b559b419d..5e5869e75f0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlActionIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlActionIT.java @@ -7,65 +7,16 @@ package org.elasticsearch.xpack.sql; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.reindex.ReindexPlugin; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.discovery.TestZenDiscovery; -import org.elasticsearch.xpack.XPackPlugin; -import org.elasticsearch.xpack.XPackSettings; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction; import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse; -import java.util.Arrays; -import java.util.Collection; import java.util.Map; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; -public class SqlActionIT extends ESIntegTestCase { - - @Override - protected boolean ignoreExternalCluster() { - return true; - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal)); - settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); - settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); - settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); - settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); - settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); - settings.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false); - return settings.build(); - } - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(XPackPlugin.class, ReindexPlugin.class); - } - - @Override - protected Collection> transportClientPlugins() { - return nodePlugins(); - } - - @Override - protected Settings transportClientSettings() { - // Plugin should be loaded on the transport client as well - return nodeSettings(0); - } - - @Override - protected Collection> getMockPlugins() { - return Arrays.asList(TestZenDiscovery.TestPlugin.class, TestSeedPlugin.class); - } - +public class SqlActionIT extends AbstractSqlIntegTestCase { public void testSqlAction() throws Exception { assertAcked(client().admin().indices().prepareCreate("test").get()); @@ -94,7 +45,7 @@ public class SqlActionIT extends ESIntegTestCase { assertThat(response.rows().get(1).get("count"), equalTo(43L)); // Check that columns within each row were returned in the requested order - for(Map row : response.rows()) { + for (Map row : response.rows()) { assertThat(row.keySet().iterator().next(), equalTo(columnOrder ? "data" : "count")); } } diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/InfoRequest.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/InfoRequest.java index 28a85894236..378cb2c6a5b 100644 --- a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/InfoRequest.java +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/InfoRequest.java @@ -22,7 +22,7 @@ public class InfoRequest extends AbstractInfoRequest { super(); } - InfoRequest(String jvmVersion, String jvmVendor, String jvmClassPath, String osName, String osVersion) { + public InfoRequest(String jvmVersion, String jvmVendor, String jvmClassPath, String osName, String osVersion) { super(jvmVersion, jvmVendor, jvmClassPath, osName, osVersion); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliHttpHandler.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliHttpHandler.java index 3bb49971d85..9aa1959166c 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliHttpHandler.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliHttpHandler.java @@ -14,17 +14,11 @@ import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.xpack.sql.cli.net.protocol.Proto; -import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer; -import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto; -import org.elasticsearch.xpack.sql.util.StringUtils; -import java.io.DataInputStream; import java.io.IOException; import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE; import static org.elasticsearch.rest.RestRequest.Method.POST; -import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.elasticsearch.rest.RestStatus.OK; public class CliHttpHandler extends BaseRestHandler { @@ -40,29 +34,14 @@ public class CliHttpHandler extends BaseRestHandler { throw new IllegalArgumentException("expected a request body"); } - try (DataInputStream in = new DataInputStream(request.content().streamInput())) { - CliRequest cliRequest = new CliRequest(Proto.INSTANCE.readRequest(in)); - return c -> client.executeLocally(CliAction.INSTANCE, cliRequest, - ActionListener.wrap(response -> cliResponse(c, response), ex -> error(c, ex))); - } - } - - private static void cliResponse(RestChannel channel, CliResponse response) { - BytesRestResponse restResponse = null; - - try { - // NOCOMMIT use a real version - restResponse = new BytesRestResponse(OK, TEXT_CONTENT_TYPE, - AbstractSqlServer.write(AbstractProto.CURRENT_VERSION, response.response())); - } catch (IOException ex) { - restResponse = new BytesRestResponse(INTERNAL_SERVER_ERROR, TEXT_CONTENT_TYPE, StringUtils.EMPTY); - } - - channel.sendResponse(restResponse); + CliRequest cliRequest = new CliRequest(request.content()); + return c -> client.executeLocally(CliAction.INSTANCE, cliRequest, + ActionListener.wrap(response -> c.sendResponse(new BytesRestResponse(OK, TEXT_CONTENT_TYPE, response.bytesReference())), + ex -> error(c, ex))); } private static void error(RestChannel channel, Exception ex) { - BytesRestResponse response = null; + BytesRestResponse response; try { response = new BytesRestResponse(channel, ex); } catch (IOException e) { diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequest.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequest.java index 8db6476187f..91b9f7a6fd4 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequest.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequest.java @@ -8,43 +8,77 @@ package org.elasticsearch.xpack.sql.plugin.cli.action; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.sql.cli.net.protocol.Proto; import org.elasticsearch.xpack.sql.protocol.shared.Request; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; public class CliRequest extends ActionRequest implements CompositeIndicesRequest { - private Request request; + private BytesReference bytesReference; - public CliRequest() {} + public CliRequest() { + } public CliRequest(Request request) { - this.request = request; + try { + request(request); + } catch (IOException ex) { + throw new IllegalArgumentException("cannot serialize the request", ex); + } + } + + public CliRequest(BytesReference bytesReference) { + this.bytesReference = bytesReference; } @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; - if (request == null) { + if (bytesReference == null) { validationException = addValidationError("no request has been specified", validationException); } return validationException; } - public Request request() { - return request; + /** + * Gets the response object from internally stored serialized version + */ + public Request request() throws IOException { + try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) { + return Proto.INSTANCE.readRequest(in); + } } - public CliRequest request(Request request) { - this.request = request; + /** + * Converts the response object into internally stored serialized version + */ + public CliRequest request(Request request) throws IOException { + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) { + Proto.INSTANCE.writeRequest(request, dataOutputStream); + } + bytesReference = bytesStreamOutput.bytes(); + } return this; } + public BytesReference bytesReference() { + return bytesReference; + } + @Override public int hashCode() { - return Objects.hash(request); + return Objects.hash(bytesReference); } @Override @@ -58,11 +92,27 @@ public class CliRequest extends ActionRequest implements CompositeIndicesRequest } CliRequest other = (CliRequest) obj; - return Objects.equals(request, other.request); + return Objects.equals(bytesReference, other.bytesReference); } @Override public String getDescription() { - return "SQL CLI [" + request + "]"; + try { + return "SQL CLI [" + request() + "]"; + } catch (IOException ex) { + return "SQL CLI [" + ex.getMessage() + "]"; + } + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + bytesReference = in.readBytesReference(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBytesReference(bytesReference); } } \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequestBuilder.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequestBuilder.java index b7d46d2fe82..35ab468ae89 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequestBuilder.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequestBuilder.java @@ -9,17 +9,19 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.xpack.sql.protocol.shared.Request; +import java.io.IOException; + public class CliRequestBuilder extends ActionRequestBuilder { public CliRequestBuilder(ElasticsearchClient client, CliAction action) { - this(client, action, null); + super(client, action, new CliRequest()); } - public CliRequestBuilder(ElasticsearchClient client, CliAction action, Request req) { + public CliRequestBuilder(ElasticsearchClient client, CliAction action, Request req) { super(client, action, new CliRequest(req)); } - public CliRequestBuilder request(Request req) { + public CliRequestBuilder request(Request req)throws IOException { request.request(req); return this; } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliResponse.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliResponse.java index d98631cd0ea..975e42b816b 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliResponse.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliResponse.java @@ -6,23 +6,87 @@ package org.elasticsearch.xpack.sql.plugin.cli.action; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.sql.cli.net.protocol.Proto; +import org.elasticsearch.xpack.sql.protocol.shared.Request; import org.elasticsearch.xpack.sql.protocol.shared.Response; -import org.elasticsearch.xpack.sql.session.RowSetCursor; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Objects; public class CliResponse extends ActionResponse { - private Response response; + private BytesReference bytesReference; - public CliResponse() {} + public CliResponse() { + } + + public CliResponse(BytesReference bytesReference) { + this.bytesReference = bytesReference; + } public CliResponse(Response response) { - this(response, null); + try { + response(response); + } catch (IOException ex) { + throw new IllegalArgumentException("cannot serialize the request", ex); + } } - public CliResponse(Response response, RowSetCursor cursor) { - this.response = response; + /** + * Gets the response object from internally stored serialized version + * + * @param request the request that was used to generate this response + */ + public Response response(Request request) throws IOException { + try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) { + return Proto.INSTANCE.readResponse(request, in); + } } - public Response response() { - return response; + /** + * Serialized the response object into internally stored serialized version + */ + public CliResponse response(Response response) throws IOException { + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) { + Proto.INSTANCE.writeResponse(response, Proto.CURRENT_VERSION, dataOutputStream); + } + bytesReference = bytesStreamOutput.bytes(); + } + return this; + } + + public BytesReference bytesReference() { + return bytesReference; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + bytesReference = in.readBytesReference(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBytesReference(bytesReference); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CliResponse that = (CliResponse) o; + return Objects.equals(bytesReference, that.bytesReference); + } + + @Override + public int hashCode() { + return Objects.hash(bytesReference); } } \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/TransportCliAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/TransportCliAction.java index ea5b16486b5..293cb096cbf 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/TransportCliAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/TransportCliAction.java @@ -18,6 +18,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.sql.execution.PlanExecutor; import org.elasticsearch.xpack.sql.plugin.cli.CliServer; +import org.elasticsearch.xpack.sql.protocol.shared.Request; + +import java.io.IOException; import static org.elasticsearch.xpack.sql.util.ActionUtils.chain; @@ -26,17 +29,25 @@ public class TransportCliAction extends HandledTransportAction clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT); } @Override - protected void doExecute(CliRequest request, ActionListener listener) { - cliServer.handle(request.request(), chain(listener, CliResponse::new)); + protected void doExecute(CliRequest cliRequest, ActionListener listener) { + final Request request; + try { + request = cliRequest.request(); + } catch (IOException ex) { + listener.onFailure(ex); + return; + } + // NOCOMMIT we need to pass the protocol version of the client to the response here + cliServer.handle(request, chain(listener, CliResponse::new)); } } \ No newline at end of file