SQL: make CliAction transport friendly (elastic/x-pack-elasticsearch#2198)

Adds proper serialization to CliAction's requests and responses

Original commit: elastic/x-pack-elasticsearch@2d2a15a5ba
This commit is contained in:
Igor Motov 2017-08-07 16:15:11 -04:00 committed by GitHub
parent bcd9934050
commit cc41a2daa0
11 changed files with 318 additions and 106 deletions

View File

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.util.Arrays;
import java.util.Collection;
public abstract class AbstractSqlIntegTestCase extends ESIntegTestCase {
@Override
protected boolean ignoreExternalCluster() {
return true;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal));
settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false);
settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
settings.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false);
return settings.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(XPackPlugin.class, ReindexPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
@Override
protected Settings transportClientSettings() {
// Plugin should be loaded on the transport client as well
return nodeSettings(0);
}
@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return Arrays.asList(TestZenDiscovery.TestPlugin.class, TestSeedPlugin.class);
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
public class CliActionIT extends AbstractSqlIntegTestCase {
public void testCliAction() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test").get());
client().prepareBulk()
.add(new IndexRequest("test", "doc", "1").source("data", "bar", "count", 42))
.add(new IndexRequest("test", "doc", "2").source("data", "baz", "count", 43))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
ensureYellow("test");
Request request = new CommandRequest("SELECT * FROM test ORDER BY count");
CliResponse response = client().prepareExecute(CliAction.INSTANCE).request(request).get();
assertThat(response.response(request).toString(), containsString("bar"));
assertThat(response.response(request).toString(), containsString("baz"));
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliRequest;
public class CliRequestTests extends AbstractStreamableTestCase<CliRequest> {
@Override
protected CliRequest createTestInstance() {
if (randomBoolean()) {
return new CliRequest(new InfoRequest(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10),
randomAlphaOfLength(10), randomAlphaOfLength(10)));
} else {
return new CliRequest(new CommandRequest(randomAlphaOfLength(10)));
}
}
@Override
protected CliRequest createBlankInstance() {
return new CliRequest();
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliResponse;
public class CliResponseTests extends AbstractStreamableTestCase<CliResponse> {
@Override
protected CliResponse createTestInstance() {
if (randomBoolean()) {
return new CliResponse(new InfoResponse(randomAlphaOfLength(10), randomAlphaOfLength(10),
randomByte(), randomByte(),
randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10)));
} else {
return new CliResponse(new CommandResponse(randomNonNegativeLong(), randomNonNegativeLong(),
randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
}
@Override
protected CliResponse createBlankInstance() {
return new CliResponse();
}
}

View File

@ -7,65 +7,16 @@ package org.elasticsearch.xpack.sql;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
public class SqlActionIT extends ESIntegTestCase {
@Override
protected boolean ignoreExternalCluster() {
return true;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal));
settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false);
settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
settings.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false);
return settings.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(XPackPlugin.class, ReindexPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
@Override
protected Settings transportClientSettings() {
// Plugin should be loaded on the transport client as well
return nodeSettings(0);
}
@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return Arrays.asList(TestZenDiscovery.TestPlugin.class, TestSeedPlugin.class);
}
public class SqlActionIT extends AbstractSqlIntegTestCase {
public void testSqlAction() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test").get());
@ -94,7 +45,7 @@ public class SqlActionIT extends ESIntegTestCase {
assertThat(response.rows().get(1).get("count"), equalTo(43L));
// Check that columns within each row were returned in the requested order
for(Map<String, Object> row : response.rows()) {
for (Map<String, Object> row : response.rows()) {
assertThat(row.keySet().iterator().next(), equalTo(columnOrder ? "data" : "count"));
}
}

View File

@ -22,7 +22,7 @@ public class InfoRequest extends AbstractInfoRequest {
super();
}
InfoRequest(String jvmVersion, String jvmVendor, String jvmClassPath, String osName, String osVersion) {
public InfoRequest(String jvmVersion, String jvmVendor, String jvmClassPath, String osName, String osVersion) {
super(jvmVersion, jvmVendor, jvmClassPath, osName, osVersion);
}

View File

@ -14,17 +14,11 @@ import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.io.DataInputStream;
import java.io.IOException;
import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.elasticsearch.rest.RestStatus.OK;
public class CliHttpHandler extends BaseRestHandler {
@ -40,29 +34,14 @@ public class CliHttpHandler extends BaseRestHandler {
throw new IllegalArgumentException("expected a request body");
}
try (DataInputStream in = new DataInputStream(request.content().streamInput())) {
CliRequest cliRequest = new CliRequest(Proto.INSTANCE.readRequest(in));
return c -> client.executeLocally(CliAction.INSTANCE, cliRequest,
ActionListener.wrap(response -> cliResponse(c, response), ex -> error(c, ex)));
}
}
private static void cliResponse(RestChannel channel, CliResponse response) {
BytesRestResponse restResponse = null;
try {
// NOCOMMIT use a real version
restResponse = new BytesRestResponse(OK, TEXT_CONTENT_TYPE,
AbstractSqlServer.write(AbstractProto.CURRENT_VERSION, response.response()));
} catch (IOException ex) {
restResponse = new BytesRestResponse(INTERNAL_SERVER_ERROR, TEXT_CONTENT_TYPE, StringUtils.EMPTY);
}
channel.sendResponse(restResponse);
CliRequest cliRequest = new CliRequest(request.content());
return c -> client.executeLocally(CliAction.INSTANCE, cliRequest,
ActionListener.wrap(response -> c.sendResponse(new BytesRestResponse(OK, TEXT_CONTENT_TYPE, response.bytesReference())),
ex -> error(c, ex)));
}
private static void error(RestChannel channel, Exception ex) {
BytesRestResponse response = null;
BytesRestResponse response;
try {
response = new BytesRestResponse(channel, ex);
} catch (IOException e) {

View File

@ -8,43 +8,77 @@ package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class CliRequest extends ActionRequest implements CompositeIndicesRequest {
private Request request;
private BytesReference bytesReference;
public CliRequest() {}
public CliRequest() {
}
public CliRequest(Request request) {
this.request = request;
try {
request(request);
} catch (IOException ex) {
throw new IllegalArgumentException("cannot serialize the request", ex);
}
}
public CliRequest(BytesReference bytesReference) {
this.bytesReference = bytesReference;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (request == null) {
if (bytesReference == null) {
validationException = addValidationError("no request has been specified", validationException);
}
return validationException;
}
public Request request() {
return request;
/**
* Gets the response object from internally stored serialized version
*/
public Request request() throws IOException {
try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) {
return Proto.INSTANCE.readRequest(in);
}
}
public CliRequest request(Request request) {
this.request = request;
/**
* Converts the response object into internally stored serialized version
*/
public CliRequest request(Request request) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) {
Proto.INSTANCE.writeRequest(request, dataOutputStream);
}
bytesReference = bytesStreamOutput.bytes();
}
return this;
}
public BytesReference bytesReference() {
return bytesReference;
}
@Override
public int hashCode() {
return Objects.hash(request);
return Objects.hash(bytesReference);
}
@Override
@ -58,11 +92,27 @@ public class CliRequest extends ActionRequest implements CompositeIndicesRequest
}
CliRequest other = (CliRequest) obj;
return Objects.equals(request, other.request);
return Objects.equals(bytesReference, other.bytesReference);
}
@Override
public String getDescription() {
return "SQL CLI [" + request + "]";
try {
return "SQL CLI [" + request() + "]";
} catch (IOException ex) {
return "SQL CLI [" + ex.getMessage() + "]";
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
bytesReference = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(bytesReference);
}
}

View File

@ -9,17 +9,19 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.IOException;
public class CliRequestBuilder extends ActionRequestBuilder<CliRequest, CliResponse, CliRequestBuilder> {
public CliRequestBuilder(ElasticsearchClient client, CliAction action) {
this(client, action, null);
super(client, action, new CliRequest());
}
public CliRequestBuilder(ElasticsearchClient client, CliAction action, Request req) {
public CliRequestBuilder(ElasticsearchClient client, CliAction action, Request req) {
super(client, action, new CliRequest(req));
}
public CliRequestBuilder request(Request req) {
public CliRequestBuilder request(Request req)throws IOException {
request.request(req);
return this;
}

View File

@ -6,23 +6,87 @@
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Objects;
public class CliResponse extends ActionResponse {
private Response response;
private BytesReference bytesReference;
public CliResponse() {}
public CliResponse() {
}
public CliResponse(BytesReference bytesReference) {
this.bytesReference = bytesReference;
}
public CliResponse(Response response) {
this(response, null);
try {
response(response);
} catch (IOException ex) {
throw new IllegalArgumentException("cannot serialize the request", ex);
}
}
public CliResponse(Response response, RowSetCursor cursor) {
this.response = response;
/**
* Gets the response object from internally stored serialized version
*
* @param request the request that was used to generate this response
*/
public Response response(Request request) throws IOException {
try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) {
return Proto.INSTANCE.readResponse(request, in);
}
}
public Response response() {
return response;
/**
* Serialized the response object into internally stored serialized version
*/
public CliResponse response(Response response) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) {
Proto.INSTANCE.writeResponse(response, Proto.CURRENT_VERSION, dataOutputStream);
}
bytesReference = bytesStreamOutput.bytes();
}
return this;
}
public BytesReference bytesReference() {
return bytesReference;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
bytesReference = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(bytesReference);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CliResponse that = (CliResponse) o;
return Objects.equals(bytesReference, that.bytesReference);
}
@Override
public int hashCode() {
return Objects.hash(bytesReference);
}
}

View File

@ -18,6 +18,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.cli.CliServer;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.util.ActionUtils.chain;
@ -26,17 +29,25 @@ public class TransportCliAction extends HandledTransportAction<CliRequest, CliRe
@Inject
public TransportCliAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
PlanExecutor planExecutor) {
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
PlanExecutor planExecutor) {
super(settings, CliAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, CliRequest::new);
this.cliServer = new CliServer(planExecutor, clusterService.getClusterName().value(), () -> clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT);
}
@Override
protected void doExecute(CliRequest request, ActionListener<CliResponse> listener) {
cliServer.handle(request.request(), chain(listener, CliResponse::new));
protected void doExecute(CliRequest cliRequest, ActionListener<CliResponse> listener) {
final Request request;
try {
request = cliRequest.request();
} catch (IOException ex) {
listener.onFailure(ex);
return;
}
// NOCOMMIT we need to pass the protocol version of the client to the response here
cliServer.handle(request, chain(listener, CliResponse::new));
}
}