SQL: make JdbcAction transport friendly (elastic/x-pack-elasticsearch#2200)

Adds proper serialization to JdbcAction's requests and responses

Original commit: elastic/x-pack-elasticsearch@22d2c0582e
This commit is contained in:
Igor Motov 2017-08-08 10:26:13 -04:00 committed by GitHub
parent cc41a2daa0
commit e09eb41340
9 changed files with 253 additions and 45 deletions

View File

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
public class JdbcActionIT extends AbstractSqlIntegTestCase {
public void testJdbcAction() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test").get());
client().prepareBulk()
.add(new IndexRequest("test", "doc", "1").source("data", "bar", "count", 42))
.add(new IndexRequest("test", "doc", "2").source("data", "baz", "count", 43))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
ensureYellow("test");
Request request = new MetaTableRequest("test");
JdbcResponse jdbcResponse = client().prepareExecute(JdbcAction.INSTANCE).request(request).get();
Response response = jdbcResponse.response(request);
assertThat(response, instanceOf(MetaTableResponse.class));
MetaTableResponse metaTableResponse = (MetaTableResponse) response;
assertThat(metaTableResponse.tables, hasSize(1));
assertThat(metaTableResponse.tables.get(0), equalTo("test"));
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcRequest;
public class JdbcRequestTests extends AbstractStreamableTestCase<JdbcRequest> {
@Override
protected JdbcRequest createTestInstance() {
if (randomBoolean()) {
return new JdbcRequest(new InfoRequest(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10),
randomAlphaOfLength(10), randomAlphaOfLength(10)));
} else {
return new JdbcRequest(new MetaTableRequest(randomAlphaOfLength(10)));
}
}
@Override
protected JdbcRequest createBlankInstance() {
return new JdbcRequest();
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcResponse;
import java.util.Collections;
public class JdbcResponseTests extends AbstractStreamableTestCase<JdbcResponse> {
@Override
protected JdbcResponse createTestInstance() {
if (randomBoolean()) {
return new JdbcResponse(new InfoResponse(randomAlphaOfLength(10), randomAlphaOfLength(10),
randomByte(), randomByte(),
randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10)));
} else {
return new JdbcResponse(new MetaTableResponse(Collections.singletonList(randomAlphaOfLength(10))));
}
}
@Override
protected JdbcResponse createBlankInstance() {
return new JdbcResponse();
}
}

View File

@ -22,7 +22,7 @@ public class InfoRequest extends AbstractInfoRequest {
super();
}
InfoRequest(String jvmVersion, String jvmVendor, String jvmClassPath, String osName, String osVersion) {
public InfoRequest(String jvmVersion, String jvmVendor, String jvmClassPath, String osName, String osVersion) {
super(jvmVersion, jvmVendor, jvmClassPath, osName, osVersion);
}

View File

@ -44,33 +44,19 @@ public class JdbcHttpHandler extends BaseRestHandler { // NOCOMMIT these are cal
try (DataInputStream in = new DataInputStream(request.content().streamInput())) {
try {
return c -> client.executeLocally(JdbcAction.INSTANCE, new JdbcRequest(Proto.INSTANCE.readRequest(in)),
wrap(response -> jdbcResponse(c, response), ex -> error(c, ex)));
wrap(response -> c.sendResponse(new BytesRestResponse(OK, TEXT_CONTENT_TYPE, response.bytesReference())),
ex -> error(c, ex)));
} catch (Exception ex) {
return badProto("Unknown message");
}
}
}
private static RestChannelConsumer badProto(String message) {
return c -> c.sendResponse(new BytesRestResponse(BAD_REQUEST, TEXT_CONTENT_TYPE, message));
}
private void jdbcResponse(RestChannel channel, JdbcResponse response) {
BytesRestResponse restResponse = null;
try {
// NOCOMMIT use the real version
restResponse = new BytesRestResponse(OK, TEXT_CONTENT_TYPE,
AbstractSqlServer.write(AbstractProto.CURRENT_VERSION, response.response()));
} catch (IOException ex) {
logger.error("error building jdbc response", ex);
restResponse = new BytesRestResponse(INTERNAL_SERVER_ERROR, TEXT_CONTENT_TYPE, StringUtils.EMPTY);
}
channel.sendResponse(restResponse);
}
private void error(RestChannel channel, Exception ex) {
logger.debug("failed to parse sql request", ex);
BytesRestResponse response = null;

View File

@ -8,43 +8,77 @@ package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class JdbcRequest extends ActionRequest implements CompositeIndicesRequest {
private Request request;
private BytesReference bytesReference;
public JdbcRequest() {}
public JdbcRequest() {
}
public JdbcRequest(Request request) {
this.request = request;
try {
request(request);
} catch (IOException ex) {
throw new IllegalArgumentException("cannot serialize the request", ex);
}
}
public JdbcRequest(BytesReference bytesReference) {
this.bytesReference = bytesReference;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (request == null) {
if (bytesReference == null) {
validationException = addValidationError("no request has been specified", validationException);
}
return validationException;
}
public Request request() {
return request;
/**
* Gets the response object from internally stored serialized version
*/
public Request request() throws IOException {
try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) {
return Proto.INSTANCE.readRequest(in);
}
}
public JdbcRequest request(Request request) {
this.request = request;
/**
* Converts the response object into internally stored serialized version
*/
public JdbcRequest request(Request request) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) {
Proto.INSTANCE.writeRequest(request, dataOutputStream);
}
bytesReference = bytesStreamOutput.bytes();
}
return this;
}
public BytesReference bytesReference() {
return bytesReference;
}
@Override
public int hashCode() {
return Objects.hash(request);
return Objects.hash(bytesReference);
}
@Override
@ -58,11 +92,27 @@ public class JdbcRequest extends ActionRequest implements CompositeIndicesReques
}
JdbcRequest other = (JdbcRequest) obj;
return Objects.equals(request, other.request);
return Objects.equals(bytesReference, other.bytesReference);
}
@Override
public String getDescription() {
return "SQL JDBC [" + request + "]";
try {
return "SQL JDBC [" + request() + "]";
} catch (IOException ex) {
return "SQL JDBC [" + ex.getMessage() + "]";
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
bytesReference = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(bytesReference);
}
}

View File

@ -9,17 +9,19 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.IOException;
public class JdbcRequestBuilder extends ActionRequestBuilder<JdbcRequest, JdbcResponse, JdbcRequestBuilder> {
public JdbcRequestBuilder(ElasticsearchClient client, JdbcAction action) {
this(client, action, null);
super(client, action, new JdbcRequest());
}
public JdbcRequestBuilder(ElasticsearchClient client, JdbcAction action, Request req) {
super(client, action, new JdbcRequest(req));
}
public JdbcRequestBuilder request(Request req) {
public JdbcRequestBuilder request(Request req) throws IOException {
request.request(req);
return this;
}

View File

@ -6,30 +6,87 @@
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Objects;
public class JdbcResponse extends ActionResponse {
private BytesReference bytesReference;
private Response response;
private RowSetCursor cursor;
public JdbcResponse() {
}
public JdbcResponse() {}
public JdbcResponse(BytesReference bytesReference) {
this.bytesReference = bytesReference;
}
public JdbcResponse(Response response) {
this(response, null);
try {
response(response);
} catch (IOException ex) {
throw new IllegalArgumentException("cannot serialize the request", ex);
}
}
public JdbcResponse(Response response, RowSetCursor cursor) {
this.response = response;
this.cursor = cursor;
/**
* Gets the response object from internally stored serialized version
*
* @param request the request that was used to generate this response
*/
public Response response(Request request) throws IOException {
try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) {
return Proto.INSTANCE.readResponse(request, in);
}
}
public Response response() {
return response;
/**
* Serialized the response object into internally stored serialized version
*/
public JdbcResponse response(Response response) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) {
Proto.INSTANCE.writeResponse(response, Proto.CURRENT_VERSION, dataOutputStream);
}
bytesReference = bytesStreamOutput.bytes();
}
return this;
}
public RowSetCursor cursor() {
return cursor;
public BytesReference bytesReference() {
return bytesReference;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
bytesReference = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(bytesReference);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JdbcResponse that = (JdbcResponse) o;
return Objects.equals(bytesReference, that.bytesReference);
}
@Override
public int hashCode() {
return Objects.hash(bytesReference);
}
}

View File

@ -18,6 +18,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.jdbc.JdbcServer;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.util.ActionUtils.chain;
@ -36,8 +39,15 @@ public class TransportJdbcAction extends HandledTransportAction<JdbcRequest, Jdb
}
@Override
protected void doExecute(JdbcRequest request, ActionListener<JdbcResponse> listener) {
protected void doExecute(JdbcRequest jdbcRequest, ActionListener<JdbcResponse> listener) {
final Request request;
try {
request = jdbcRequest.request();
} catch (IOException ex) {
listener.onFailure(ex);
return;
}
// NOCOMMIT looks like this runs on the netty threadpool which might be bad. If we go async immediately it is ok, but we don't.
jdbcServer.handle(request.request(), chain(listener, JdbcResponse::new));
jdbcServer.handle(request, chain(listener, JdbcResponse::new));
}
}