From e09eb413401bf5fdc030f20a3fec873447f99831 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 8 Aug 2017 10:26:13 -0400 Subject: [PATCH] SQL: make JdbcAction transport friendly (elastic/x-pack-elasticsearch#2200) Adds proper serialization to JdbcAction's requests and responses Original commit: elastic/x-pack-elasticsearch@22d2c0582e8a1122fec34ece961aeb2e67d053d5 --- .../elasticsearch/xpack/sql/JdbcActionIT.java | 42 ++++++++++ .../xpack/sql/JdbcRequestTests.java | 29 +++++++ .../xpack/sql/JdbcResponseTests.java | 32 ++++++++ .../sql/jdbc/net/protocol/InfoRequest.java | 2 +- .../plugin/jdbc/action/JdbcHttpHandler.java | 20 +---- .../sql/plugin/jdbc/action/JdbcRequest.java | 72 ++++++++++++++--- .../jdbc/action/JdbcRequestBuilder.java | 6 +- .../sql/plugin/jdbc/action/JdbcResponse.java | 81 ++++++++++++++++--- .../jdbc/action/TransportJdbcAction.java | 14 +++- 9 files changed, 253 insertions(+), 45 deletions(-) create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/sql/JdbcActionIT.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/sql/JdbcRequestTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/sql/JdbcResponseTests.java diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/JdbcActionIT.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/JdbcActionIT.java new file mode 100644 index 00000000000..1356d731d89 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/JdbcActionIT.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse; +import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction; +import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcResponse; +import org.elasticsearch.xpack.sql.protocol.shared.Request; +import org.elasticsearch.xpack.sql.protocol.shared.Response; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; + +public class JdbcActionIT extends AbstractSqlIntegTestCase { + + public void testJdbcAction() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test").get()); + client().prepareBulk() + .add(new IndexRequest("test", "doc", "1").source("data", "bar", "count", 42)) + .add(new IndexRequest("test", "doc", "2").source("data", "baz", "count", 43)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + ensureYellow("test"); + + Request request = new MetaTableRequest("test"); + + JdbcResponse jdbcResponse = client().prepareExecute(JdbcAction.INSTANCE).request(request).get(); + Response response = jdbcResponse.response(request); + assertThat(response, instanceOf(MetaTableResponse.class)); + MetaTableResponse metaTableResponse = (MetaTableResponse) response; + assertThat(metaTableResponse.tables, hasSize(1)); + assertThat(metaTableResponse.tables.get(0), equalTo("test")); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/JdbcRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/JdbcRequestTests.java new file mode 100644 index 00000000000..e58af813c99 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/JdbcRequestTests.java @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql; + +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoRequest; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest; +import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcRequest; + +public class JdbcRequestTests extends AbstractStreamableTestCase { + + @Override + protected JdbcRequest createTestInstance() { + if (randomBoolean()) { + return new JdbcRequest(new InfoRequest(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10), + randomAlphaOfLength(10), randomAlphaOfLength(10))); + } else { + return new JdbcRequest(new MetaTableRequest(randomAlphaOfLength(10))); + } + } + + @Override + protected JdbcRequest createBlankInstance() { + return new JdbcRequest(); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/JdbcResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/JdbcResponseTests.java new file mode 100644 index 00000000000..740238c8bbb --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/JdbcResponseTests.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql; + +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoResponse; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse; +import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcResponse; + +import java.util.Collections; + +public class JdbcResponseTests extends AbstractStreamableTestCase { + + @Override + protected JdbcResponse createTestInstance() { + if (randomBoolean()) { + return new JdbcResponse(new InfoResponse(randomAlphaOfLength(10), randomAlphaOfLength(10), + randomByte(), randomByte(), + randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10))); + } else { + return new JdbcResponse(new MetaTableResponse(Collections.singletonList(randomAlphaOfLength(10)))); + } + } + + @Override + protected JdbcResponse createBlankInstance() { + return new JdbcResponse(); + } +} \ No newline at end of file diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/InfoRequest.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/InfoRequest.java index fb18a0e9d47..2c10d04fc41 100644 --- a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/InfoRequest.java +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/InfoRequest.java @@ -22,7 +22,7 @@ public class InfoRequest extends AbstractInfoRequest { super(); } - InfoRequest(String jvmVersion, String jvmVendor, String jvmClassPath, String osName, String osVersion) { + public InfoRequest(String jvmVersion, String jvmVendor, String jvmClassPath, String osName, String osVersion) { super(jvmVersion, jvmVendor, jvmClassPath, osName, osVersion); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcHttpHandler.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcHttpHandler.java index 5c1a804a743..ea0523b573a 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcHttpHandler.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcHttpHandler.java @@ -44,33 +44,19 @@ public class JdbcHttpHandler extends BaseRestHandler { // NOCOMMIT these are cal try (DataInputStream in = new DataInputStream(request.content().streamInput())) { try { return c -> client.executeLocally(JdbcAction.INSTANCE, new JdbcRequest(Proto.INSTANCE.readRequest(in)), - wrap(response -> jdbcResponse(c, response), ex -> error(c, ex))); + wrap(response -> c.sendResponse(new BytesRestResponse(OK, TEXT_CONTENT_TYPE, response.bytesReference())), + ex -> error(c, ex))); } catch (Exception ex) { return badProto("Unknown message"); } } } - + private static RestChannelConsumer badProto(String message) { return c -> c.sendResponse(new BytesRestResponse(BAD_REQUEST, TEXT_CONTENT_TYPE, message)); } - private void jdbcResponse(RestChannel channel, JdbcResponse response) { - BytesRestResponse restResponse = null; - - try { - // NOCOMMIT use the real version - restResponse = new BytesRestResponse(OK, TEXT_CONTENT_TYPE, - AbstractSqlServer.write(AbstractProto.CURRENT_VERSION, response.response())); - } catch (IOException ex) { - logger.error("error building jdbc response", ex); - restResponse = new BytesRestResponse(INTERNAL_SERVER_ERROR, TEXT_CONTENT_TYPE, StringUtils.EMPTY); - } - - channel.sendResponse(restResponse); - } - private void error(RestChannel channel, Exception ex) { logger.debug("failed to parse sql request", ex); BytesRestResponse response = null; diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcRequest.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcRequest.java index 108a0b3252b..8360d49395d 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcRequest.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcRequest.java @@ -8,43 +8,77 @@ package org.elasticsearch.xpack.sql.plugin.jdbc.action; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto; import org.elasticsearch.xpack.sql.protocol.shared.Request; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; public class JdbcRequest extends ActionRequest implements CompositeIndicesRequest { - private Request request; + private BytesReference bytesReference; - public JdbcRequest() {} + public JdbcRequest() { + } public JdbcRequest(Request request) { - this.request = request; + try { + request(request); + } catch (IOException ex) { + throw new IllegalArgumentException("cannot serialize the request", ex); + } + } + + public JdbcRequest(BytesReference bytesReference) { + this.bytesReference = bytesReference; } @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; - if (request == null) { + if (bytesReference == null) { validationException = addValidationError("no request has been specified", validationException); } return validationException; } - public Request request() { - return request; + /** + * Gets the response object from internally stored serialized version + */ + public Request request() throws IOException { + try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) { + return Proto.INSTANCE.readRequest(in); + } } - public JdbcRequest request(Request request) { - this.request = request; + /** + * Converts the response object into internally stored serialized version + */ + public JdbcRequest request(Request request) throws IOException { + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) { + Proto.INSTANCE.writeRequest(request, dataOutputStream); + } + bytesReference = bytesStreamOutput.bytes(); + } return this; } + public BytesReference bytesReference() { + return bytesReference; + } + @Override public int hashCode() { - return Objects.hash(request); + return Objects.hash(bytesReference); } @Override @@ -58,11 +92,27 @@ public class JdbcRequest extends ActionRequest implements CompositeIndicesReques } JdbcRequest other = (JdbcRequest) obj; - return Objects.equals(request, other.request); + return Objects.equals(bytesReference, other.bytesReference); } @Override public String getDescription() { - return "SQL JDBC [" + request + "]"; + try { + return "SQL JDBC [" + request() + "]"; + } catch (IOException ex) { + return "SQL JDBC [" + ex.getMessage() + "]"; + } + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + bytesReference = in.readBytesReference(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBytesReference(bytesReference); } } \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcRequestBuilder.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcRequestBuilder.java index a07516b83af..be4ca0fddc6 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcRequestBuilder.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcRequestBuilder.java @@ -9,17 +9,19 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.xpack.sql.protocol.shared.Request; +import java.io.IOException; + public class JdbcRequestBuilder extends ActionRequestBuilder { public JdbcRequestBuilder(ElasticsearchClient client, JdbcAction action) { - this(client, action, null); + super(client, action, new JdbcRequest()); } public JdbcRequestBuilder(ElasticsearchClient client, JdbcAction action, Request req) { super(client, action, new JdbcRequest(req)); } - public JdbcRequestBuilder request(Request req) { + public JdbcRequestBuilder request(Request req) throws IOException { request.request(req); return this; } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcResponse.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcResponse.java index 6c40f91263c..7350f0d33be 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcResponse.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/JdbcResponse.java @@ -6,30 +6,87 @@ package org.elasticsearch.xpack.sql.plugin.jdbc.action; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto; +import org.elasticsearch.xpack.sql.protocol.shared.Request; import org.elasticsearch.xpack.sql.protocol.shared.Response; -import org.elasticsearch.xpack.sql.session.RowSetCursor; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Objects; public class JdbcResponse extends ActionResponse { + private BytesReference bytesReference; - private Response response; - private RowSetCursor cursor; + public JdbcResponse() { + } - public JdbcResponse() {} + public JdbcResponse(BytesReference bytesReference) { + this.bytesReference = bytesReference; + } public JdbcResponse(Response response) { - this(response, null); + try { + response(response); + } catch (IOException ex) { + throw new IllegalArgumentException("cannot serialize the request", ex); + } } - public JdbcResponse(Response response, RowSetCursor cursor) { - this.response = response; - this.cursor = cursor; + /** + * Gets the response object from internally stored serialized version + * + * @param request the request that was used to generate this response + */ + public Response response(Request request) throws IOException { + try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) { + return Proto.INSTANCE.readResponse(request, in); + } } - public Response response() { - return response; + /** + * Serialized the response object into internally stored serialized version + */ + public JdbcResponse response(Response response) throws IOException { + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) { + Proto.INSTANCE.writeResponse(response, Proto.CURRENT_VERSION, dataOutputStream); + } + bytesReference = bytesStreamOutput.bytes(); + } + return this; } - public RowSetCursor cursor() { - return cursor; + public BytesReference bytesReference() { + return bytesReference; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + bytesReference = in.readBytesReference(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBytesReference(bytesReference); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + JdbcResponse that = (JdbcResponse) o; + return Objects.equals(bytesReference, that.bytesReference); + } + + @Override + public int hashCode() { + return Objects.hash(bytesReference); } } \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/TransportJdbcAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/TransportJdbcAction.java index ba2d19f9eb5..10ec34304f9 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/TransportJdbcAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/TransportJdbcAction.java @@ -18,6 +18,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.sql.execution.PlanExecutor; import org.elasticsearch.xpack.sql.plugin.jdbc.JdbcServer; +import org.elasticsearch.xpack.sql.protocol.shared.Request; + +import java.io.IOException; import static org.elasticsearch.xpack.sql.util.ActionUtils.chain; @@ -36,8 +39,15 @@ public class TransportJdbcAction extends HandledTransportAction listener) { + protected void doExecute(JdbcRequest jdbcRequest, ActionListener listener) { + final Request request; + try { + request = jdbcRequest.request(); + } catch (IOException ex) { + listener.onFailure(ex); + return; + } // NOCOMMIT looks like this runs on the netty threadpool which might be bad. If we go async immediately it is ok, but we don't. - jdbcServer.handle(request.request(), chain(listener, JdbcResponse::new)); + jdbcServer.handle(request, chain(listener, JdbcResponse::new)); } } \ No newline at end of file