SQL: make SqlAction transport friendly ()

Adds proper serialization to SqlAction's requests and responses

Original commit: elastic/x-pack-elasticsearch@113f69d5b9
This commit is contained in:
Igor Motov 2017-08-04 16:25:38 -04:00 committed by GitHub
parent f241512e33
commit 0844ada594
9 changed files with 301 additions and 42 deletions

@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
public class SqlActionIT extends ESIntegTestCase {
@Override
protected boolean ignoreExternalCluster() {
return true;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal));
settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false);
settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
settings.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false);
return settings.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(XPackPlugin.class, ReindexPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
@Override
protected Settings transportClientSettings() {
// Plugin should be loaded on the transport client as well
return nodeSettings(0);
}
@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return Arrays.asList(TestZenDiscovery.TestPlugin.class, TestSeedPlugin.class);
}
public void testSqlAction() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test").get());
client().prepareBulk()
.add(new IndexRequest("test", "doc", "1").source("data", "bar", "count", 42))
.add(new IndexRequest("test", "doc", "2").source("data", "baz", "count", 43))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
ensureYellow("test");
boolean columnOrder = randomBoolean();
String columns = columnOrder ? "data, count" : "count, data";
SqlResponse response = client().prepareExecute(SqlAction.INSTANCE).query("SELECT " + columns + " FROM test ORDER BY count").get();
assertThat(response.size(), equalTo(2L));
assertThat(response.columns().keySet(), hasSize(2));
assertThat(response.columns().get("data"), equalTo("text"));
assertThat(response.columns().get("count"), equalTo("long"));
// Check that columns were returned in the requested order
assertThat(response.columns().keySet().iterator().next(), equalTo(columnOrder ? "data" : "count"));
assertThat(response.rows(), hasSize(2));
assertThat(response.rows().get(0).get("data"), equalTo("bar"));
assertThat(response.rows().get(0).get("count"), equalTo(42L));
assertThat(response.rows().get(1).get("data"), equalTo("baz"));
assertThat(response.rows().get(1).get("count"), equalTo(43L));
// Check that columns within each row were returned in the requested order
for(Map<String, Object> row : response.rows()) {
assertThat(row.keySet().iterator().next(), equalTo(columnOrder ? "data" : "count"));
}
}
}

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest;
public class SqlRequestTests extends AbstractStreamableTestCase<SqlRequest> {
@Override
protected SqlRequest createTestInstance() {
return new SqlRequest(randomAlphaOfLength(10), randomDateTimeZone(), randomBoolean() ? randomAlphaOfLength(10) : null);
}
@Override
protected SqlRequest createBlankInstance() {
return new SqlRequest();
}
}

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SqlResponseTests extends AbstractStreamableTestCase<SqlResponse> {
@Override
protected SqlResponse createTestInstance() {
Map<String, String> columns;
List<Map<String, Object>> rows;
if (randomBoolean()) {
columns = Collections.emptyMap();
} else {
int size = randomIntBetween(1, 10);
columns = new HashMap<>(size);
for (int i = 0; i < size; i++) {
columns.put(randomAlphaOfLength(10), randomAlphaOfLength(10));
}
}
if (randomBoolean()) {
rows = Collections.emptyList();
} else {
int size = randomIntBetween(1, 10);
rows = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Map<String, Object> row = new HashMap<>(size);
for (int j = 0; i < size; i++) {
row.put(randomAlphaOfLength(10), randomBoolean() ? randomAlphaOfLength(10) : randomInt());
}
rows.add(row);
}
}
return new SqlResponse(randomAlphaOfLength(10), randomNonNegativeLong(), columns, rows);
}
@Override
protected SqlResponse createBlankInstance() {
return new SqlResponse();
}
}

@ -20,9 +20,10 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
public class SqlRequest extends ActionRequest implements CompositeIndicesRequest {
public static final DateTimeZone DEFAULT_TIME_ZONE = DateTimeZone.UTC;
// initialized on the first request
private String query;
private DateTimeZone timeZone;
private DateTimeZone timeZone = DEFAULT_TIME_ZONE;
// initialized after the plan has been translated
private String sessionId;
@ -40,6 +41,9 @@ public class SqlRequest extends ActionRequest implements CompositeIndicesRequest
if (!Strings.hasText(query)) {
validationException = addValidationError("sql query is missing", validationException);
}
if (timeZone == null) {
validationException = addValidationError("timezone is missing", validationException);
}
return validationException;
}
@ -65,6 +69,11 @@ public class SqlRequest extends ActionRequest implements CompositeIndicesRequest
return this;
}
public SqlRequest timeZone(DateTimeZone timeZone) {
this.timeZone = timeZone;
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

@ -9,10 +9,12 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.joda.time.DateTimeZone;
import static org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest.DEFAULT_TIME_ZONE;
public class SqlRequestBuilder extends ActionRequestBuilder<SqlRequest, SqlResponse, SqlRequestBuilder> {
public SqlRequestBuilder(ElasticsearchClient client, SqlAction action) {
this(client, action, null, null, null);
this(client, action, null, DEFAULT_TIME_ZONE, null);
}
public SqlRequestBuilder(ElasticsearchClient client, SqlAction action, String query, DateTimeZone timeZone, String sessionId) {
@ -28,4 +30,10 @@ public class SqlRequestBuilder extends ActionRequestBuilder<SqlRequest, SqlRespo
request.sessionId(sessionId);
return this;
}
public SqlRequestBuilder timeZone(DateTimeZone timeZone) {
request.timeZone(timeZone);
return this;
}
}

@ -6,35 +6,79 @@
package org.elasticsearch.xpack.sql.plugin.sql.action;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
public class SqlResponse extends ActionResponse {
private String sessionId;
private RowSetCursor rowCursor;
private long size;
// NOCOMMIT: we probably need to add more info about columns, but that's all we use for now
// NOCOMMIT: order of elements is important, so we might want to replace this with lists and
// reflect this in generated JSON as well
private Map<String, String> columns;
private List<Map<String, Object>> rows;
public SqlResponse() {}
public SqlResponse(String sessionId, RowSetCursor rowCursor) {
this.sessionId = sessionId;
this.rowCursor = rowCursor;
public SqlResponse() {
}
public RowSetCursor rowSetCursor() {
return rowCursor;
public SqlResponse(String sessionId, long size, Map<String, String> columns, List<Map<String, Object>> rows) {
this.sessionId = sessionId;
this.size = size;
this.columns = columns;
this.rows = rows;
}
public long size() {
return size;
}
public Map<String, String> columns() {
return columns;
}
public List<Map<String, Object>> rows() {
return rows;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("only local transport");
sessionId = in.readOptionalString();
size = in.readVLong();
columns = in.readMap(StreamInput::readString, StreamInput::readString);
rows = in.readList(StreamInput::readMap);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException("only local transport");
out.writeOptionalString(sessionId);
out.writeVLong(size);
out.writeMap(columns, StreamOutput::writeString, StreamOutput::writeString);
out.writeVInt(rows.size());
for (Map<String, Object> row : rows) {
out.writeMap(row);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SqlResponse that = (SqlResponse) o;
return size == that.size &&
Objects.equals(sessionId, that.sessionId) &&
Objects.equals(columns, that.columns) &&
Objects.equals(rows, that.rows);
}
@Override
public int hashCode() {
return Objects.hash(sessionId, size, columns, rows);
}
}

@ -9,7 +9,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.cache.Cache;
@ -23,6 +22,10 @@ import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.elasticsearch.xpack.sql.util.ActionUtils.chain;
@ -30,21 +33,20 @@ import static org.elasticsearch.xpack.sql.util.ActionUtils.chain;
public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlResponse> {
//TODO: externalize timeout
private final Cache<String, RowSetCursor> SESSIONS = CacheBuilder.<String, RowSetCursor> builder()
private final Cache<String, RowSetCursor> SESSIONS = CacheBuilder.<String, RowSetCursor>builder()
.setMaximumWeight(1024)
.setExpireAfterAccess(TimeValue.timeValueMinutes(10))
.setExpireAfterWrite(TimeValue.timeValueMinutes(10))
.build();
private final Supplier<String> ephemeralId;
private final PlanExecutor planExecutor;
@Inject
public TransportSqlAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
PlanExecutor planExecutor) {
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
PlanExecutor planExecutor) {
super(settings, SqlAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SqlRequest::new);
this.planExecutor = planExecutor;
@ -69,20 +71,16 @@ public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlRe
// .put(SqlSettings.TIMEZONE_ID, request.timeZone().getID()).build());
planExecutor.sql(query, chain(listener, c -> {
String id = generateId();
SESSIONS.put(id, c);
return new SqlResponse(id, c);
}));
}
else {
String id = generateId();
SESSIONS.put(id, c);
return createResponse(id, c);
}));
} else {
RowSetCursor cursor = SESSIONS.get(sessionId);
if (cursor == null) {
listener.onFailure(new SqlIllegalArgumentException("SQL session cannot be found"));
}
else {
cursor.nextSet(chain(listener, c -> {
return new SqlResponse(sessionId, c);
}));
} else {
cursor.nextSet(chain(listener, c -> createResponse(sessionId, cursor)));
}
}
} catch (Exception ex) {
@ -93,4 +91,23 @@ public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlRe
private String generateId() {
return ephemeralId.get() + "-" + UUIDs.base64UUID();
}
static SqlResponse createResponse(String sessionId, RowSetCursor cursor) {
Map<String, String> columns = new LinkedHashMap<>(cursor.schema().types().size());
cursor.schema().forEach(entry -> {
columns.put(entry.name(), entry.type().esName());
});
List<Map<String, Object>> rows = new ArrayList<>();
cursor.forEachRow(objects -> {
Map<String, Object> row = new LinkedHashMap<>(objects.rowSize());
objects.forEachColumn((o, entry) -> row.put(entry.name(), o));
rows.add(row);
});
return new SqlResponse(
sessionId,
cursor.size(),
columns,
rows);
}
}

@ -11,12 +11,11 @@ import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowView;
import org.elasticsearch.xpack.sql.type.Schema.Entry;
import org.elasticsearch.xpack.sql.util.ThrowableBiConsumer;
import org.elasticsearch.xpack.sql.util.ThrowableConsumer;
import java.util.Map;
import static org.elasticsearch.rest.RestStatus.OK;
class CursorRestResponseListener extends RestBuilderListener<SqlResponse> {
@ -27,27 +26,29 @@ class CursorRestResponseListener extends RestBuilderListener<SqlResponse> {
@Override
public RestResponse buildResponse(SqlResponse response, XContentBuilder builder) throws Exception {
return new BytesRestResponse(OK, createResponse(response.rowSetCursor(), builder));
return new BytesRestResponse(OK, createResponse(response, builder));
}
static XContentBuilder createResponse(RowSetCursor cursor, XContentBuilder builder) throws Exception {
private static XContentBuilder createResponse(SqlResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
// header
builder.field("size", cursor.size());
builder.field("size", response.size());
// NOCOMMIT: that should be a list since order is important
builder.startObject("columns");
ThrowableConsumer<Entry> buildSchema = e -> builder.startObject(e.name()).field("type", e.type().esName()).endObject();
cursor.schema().forEach(buildSchema);
ThrowableBiConsumer<String, String> buildSchema = (f, t) -> builder.startObject(f).field("type", t).endObject();
response.columns().forEach(buildSchema);
builder.endObject();
// payload
builder.startArray("rows");
ThrowableBiConsumer<Object, Entry> eachColumn = (v, e) -> builder.field(e.name(), v);
ThrowableConsumer<RowView> eachRow = r -> { builder.startObject(); r.forEachColumn(eachColumn); builder.endObject(); };
ThrowableBiConsumer<String, Object> eachColumn = builder::field;
// NOCOMMIT: that should be a list since order is important
ThrowableConsumer<Map<String, Object>> eachRow = r -> { builder.startObject(); r.forEach(eachColumn); builder.endObject(); };
cursor.forEachRow(eachRow);
response.rows().forEach(eachRow);
builder.endArray();
builder.endObject();

@ -73,7 +73,7 @@ public class RestSqlAction extends BaseRestHandler {
}
String query;
DateTimeZone timeZone;
DateTimeZone timeZone = SqlRequest.DEFAULT_TIME_ZONE;
static Payload from(RestRequest request) throws IOException {
Payload payload = new Payload();