diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlActionIT.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlActionIT.java new file mode 100644 index 00000000000..74b559b419d --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlActionIT.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.discovery.TestZenDiscovery; +import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.XPackSettings; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class SqlActionIT extends ESIntegTestCase { + + @Override + protected boolean ignoreExternalCluster() { + return true; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal)); + settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); + settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); + settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); + settings.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false); + return settings.build(); + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(XPackPlugin.class, ReindexPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + @Override + protected Settings transportClientSettings() { + // Plugin should be loaded on the transport client as well + return nodeSettings(0); + } + + @Override + protected Collection> getMockPlugins() { + return Arrays.asList(TestZenDiscovery.TestPlugin.class, TestSeedPlugin.class); + } + + + public void testSqlAction() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test").get()); + client().prepareBulk() + .add(new IndexRequest("test", "doc", "1").source("data", "bar", "count", 42)) + .add(new IndexRequest("test", "doc", "2").source("data", "baz", "count", 43)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + ensureYellow("test"); + + boolean columnOrder = randomBoolean(); + String columns = columnOrder ? "data, count" : "count, data"; + SqlResponse response = client().prepareExecute(SqlAction.INSTANCE).query("SELECT " + columns + " FROM test ORDER BY count").get(); + assertThat(response.size(), equalTo(2L)); + assertThat(response.columns().keySet(), hasSize(2)); + assertThat(response.columns().get("data"), equalTo("text")); + assertThat(response.columns().get("count"), equalTo("long")); + + // Check that columns were returned in the requested order + assertThat(response.columns().keySet().iterator().next(), equalTo(columnOrder ? "data" : "count")); + + assertThat(response.rows(), hasSize(2)); + assertThat(response.rows().get(0).get("data"), equalTo("bar")); + assertThat(response.rows().get(0).get("count"), equalTo(42L)); + assertThat(response.rows().get(1).get("data"), equalTo("baz")); + assertThat(response.rows().get(1).get("count"), equalTo(43L)); + + // Check that columns within each row were returned in the requested order + for(Map row : response.rows()) { + assertThat(row.keySet().iterator().next(), equalTo(columnOrder ? "data" : "count")); + } + } +} + diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlRequestTests.java new file mode 100644 index 00000000000..8aeebcd19de --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlRequestTests.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql; + +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest; + +public class SqlRequestTests extends AbstractStreamableTestCase { + + @Override + protected SqlRequest createTestInstance() { + return new SqlRequest(randomAlphaOfLength(10), randomDateTimeZone(), randomBoolean() ? randomAlphaOfLength(10) : null); + } + + @Override + protected SqlRequest createBlankInstance() { + return new SqlRequest(); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlResponseTests.java new file mode 100644 index 00000000000..9f569cfd682 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlResponseTests.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql; + +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SqlResponseTests extends AbstractStreamableTestCase { + + @Override + protected SqlResponse createTestInstance() { + Map columns; + List> rows; + if (randomBoolean()) { + columns = Collections.emptyMap(); + } else { + int size = randomIntBetween(1, 10); + columns = new HashMap<>(size); + for (int i = 0; i < size; i++) { + columns.put(randomAlphaOfLength(10), randomAlphaOfLength(10)); + } + } + + if (randomBoolean()) { + rows = Collections.emptyList(); + } else { + int size = randomIntBetween(1, 10); + rows = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + Map row = new HashMap<>(size); + for (int j = 0; i < size; i++) { + row.put(randomAlphaOfLength(10), randomBoolean() ? randomAlphaOfLength(10) : randomInt()); + } + rows.add(row); + } + } + + + return new SqlResponse(randomAlphaOfLength(10), randomNonNegativeLong(), columns, rows); + } + + @Override + protected SqlResponse createBlankInstance() { + return new SqlResponse(); + } + +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequest.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequest.java index c7e067825d6..034db7aa1cf 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequest.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequest.java @@ -20,9 +20,10 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; public class SqlRequest extends ActionRequest implements CompositeIndicesRequest { + public static final DateTimeZone DEFAULT_TIME_ZONE = DateTimeZone.UTC; // initialized on the first request private String query; - private DateTimeZone timeZone; + private DateTimeZone timeZone = DEFAULT_TIME_ZONE; // initialized after the plan has been translated private String sessionId; @@ -40,6 +41,9 @@ public class SqlRequest extends ActionRequest implements CompositeIndicesRequest if (!Strings.hasText(query)) { validationException = addValidationError("sql query is missing", validationException); } + if (timeZone == null) { + validationException = addValidationError("timezone is missing", validationException); + } return validationException; } @@ -65,6 +69,11 @@ public class SqlRequest extends ActionRequest implements CompositeIndicesRequest return this; } + public SqlRequest timeZone(DateTimeZone timeZone) { + this.timeZone = timeZone; + return this; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequestBuilder.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequestBuilder.java index b63bcd9ec7c..78d8c821e9b 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequestBuilder.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequestBuilder.java @@ -9,10 +9,12 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.joda.time.DateTimeZone; +import static org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest.DEFAULT_TIME_ZONE; + public class SqlRequestBuilder extends ActionRequestBuilder { public SqlRequestBuilder(ElasticsearchClient client, SqlAction action) { - this(client, action, null, null, null); + this(client, action, null, DEFAULT_TIME_ZONE, null); } public SqlRequestBuilder(ElasticsearchClient client, SqlAction action, String query, DateTimeZone timeZone, String sessionId) { @@ -28,4 +30,10 @@ public class SqlRequestBuilder extends ActionRequestBuilder columns; + private List> rows; - public SqlResponse() {} - public SqlResponse(String sessionId, RowSetCursor rowCursor) { - this.sessionId = sessionId; - this.rowCursor = rowCursor; + public SqlResponse() { } - public RowSetCursor rowSetCursor() { - return rowCursor; + public SqlResponse(String sessionId, long size, Map columns, List> rows) { + this.sessionId = sessionId; + this.size = size; + this.columns = columns; + this.rows = rows; + } + + public long size() { + return size; + } + + public Map columns() { + return columns; + } + + public List> rows() { + return rows; } @Override public void readFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("only local transport"); + sessionId = in.readOptionalString(); + size = in.readVLong(); + columns = in.readMap(StreamInput::readString, StreamInput::readString); + rows = in.readList(StreamInput::readMap); } @Override public void writeTo(StreamOutput out) throws IOException { - throw new UnsupportedOperationException("only local transport"); + out.writeOptionalString(sessionId); + out.writeVLong(size); + out.writeMap(columns, StreamOutput::writeString, StreamOutput::writeString); + out.writeVInt(rows.size()); + for (Map row : rows) { + out.writeMap(row); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SqlResponse that = (SqlResponse) o; + return size == that.size && + Objects.equals(sessionId, that.sessionId) && + Objects.equals(columns, that.columns) && + Objects.equals(rows, that.rows); + } + + @Override + public int hashCode() { + return Objects.hash(sessionId, size, columns, rows); } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java index 41a9569d011..f9a57211514 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.cache.Cache; @@ -23,6 +22,10 @@ import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.execution.PlanExecutor; import org.elasticsearch.xpack.sql.session.RowSetCursor; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.function.Supplier; import static org.elasticsearch.xpack.sql.util.ActionUtils.chain; @@ -30,21 +33,20 @@ import static org.elasticsearch.xpack.sql.util.ActionUtils.chain; public class TransportSqlAction extends HandledTransportAction { //TODO: externalize timeout - private final Cache SESSIONS = CacheBuilder. builder() + private final Cache SESSIONS = CacheBuilder.builder() .setMaximumWeight(1024) .setExpireAfterAccess(TimeValue.timeValueMinutes(10)) .setExpireAfterWrite(TimeValue.timeValueMinutes(10)) .build(); - + private final Supplier ephemeralId; private final PlanExecutor planExecutor; @Inject public TransportSqlAction(Settings settings, ThreadPool threadPool, - TransportService transportService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService, - PlanExecutor planExecutor) { + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + PlanExecutor planExecutor) { super(settings, SqlAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SqlRequest::new); this.planExecutor = planExecutor; @@ -69,20 +71,16 @@ public class TransportSqlAction extends HandledTransportAction { - String id = generateId(); - SESSIONS.put(id, c); - return new SqlResponse(id, c); - })); - } - else { + String id = generateId(); + SESSIONS.put(id, c); + return createResponse(id, c); + })); + } else { RowSetCursor cursor = SESSIONS.get(sessionId); if (cursor == null) { listener.onFailure(new SqlIllegalArgumentException("SQL session cannot be found")); - } - else { - cursor.nextSet(chain(listener, c -> { - return new SqlResponse(sessionId, c); - })); + } else { + cursor.nextSet(chain(listener, c -> createResponse(sessionId, cursor))); } } } catch (Exception ex) { @@ -93,4 +91,23 @@ public class TransportSqlAction extends HandledTransportAction columns = new LinkedHashMap<>(cursor.schema().types().size()); + cursor.schema().forEach(entry -> { + columns.put(entry.name(), entry.type().esName()); + }); + + List> rows = new ArrayList<>(); + cursor.forEachRow(objects -> { + Map row = new LinkedHashMap<>(objects.rowSize()); + objects.forEachColumn((o, entry) -> row.put(entry.name(), o)); + rows.add(row); + }); + return new SqlResponse( + sessionId, + cursor.size(), + columns, + rows); + } } \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/CursorRestResponseListener.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/CursorRestResponseListener.java index 426f6d5a61f..32e384c56f4 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/CursorRestResponseListener.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/CursorRestResponseListener.java @@ -11,12 +11,11 @@ import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.RestBuilderListener; import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse; -import org.elasticsearch.xpack.sql.session.RowSetCursor; -import org.elasticsearch.xpack.sql.session.RowView; -import org.elasticsearch.xpack.sql.type.Schema.Entry; import org.elasticsearch.xpack.sql.util.ThrowableBiConsumer; import org.elasticsearch.xpack.sql.util.ThrowableConsumer; +import java.util.Map; + import static org.elasticsearch.rest.RestStatus.OK; class CursorRestResponseListener extends RestBuilderListener { @@ -27,27 +26,29 @@ class CursorRestResponseListener extends RestBuilderListener { @Override public RestResponse buildResponse(SqlResponse response, XContentBuilder builder) throws Exception { - return new BytesRestResponse(OK, createResponse(response.rowSetCursor(), builder)); + return new BytesRestResponse(OK, createResponse(response, builder)); } - static XContentBuilder createResponse(RowSetCursor cursor, XContentBuilder builder) throws Exception { + private static XContentBuilder createResponse(SqlResponse response, XContentBuilder builder) throws Exception { builder.startObject(); // header - builder.field("size", cursor.size()); + builder.field("size", response.size()); + // NOCOMMIT: that should be a list since order is important builder.startObject("columns"); - ThrowableConsumer buildSchema = e -> builder.startObject(e.name()).field("type", e.type().esName()).endObject(); - cursor.schema().forEach(buildSchema); + ThrowableBiConsumer buildSchema = (f, t) -> builder.startObject(f).field("type", t).endObject(); + response.columns().forEach(buildSchema); builder.endObject(); // payload builder.startArray("rows"); - ThrowableBiConsumer eachColumn = (v, e) -> builder.field(e.name(), v); - ThrowableConsumer eachRow = r -> { builder.startObject(); r.forEachColumn(eachColumn); builder.endObject(); }; + ThrowableBiConsumer eachColumn = builder::field; + // NOCOMMIT: that should be a list since order is important + ThrowableConsumer> eachRow = r -> { builder.startObject(); r.forEach(eachColumn); builder.endObject(); }; - cursor.forEachRow(eachRow); + response.rows().forEach(eachRow); builder.endArray(); builder.endObject(); diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java index 7f55b51985c..2b39fe533f6 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java @@ -73,7 +73,7 @@ public class RestSqlAction extends BaseRestHandler { } String query; - DateTimeZone timeZone; + DateTimeZone timeZone = SqlRequest.DEFAULT_TIME_ZONE; static Payload from(RestRequest request) throws IOException { Payload payload = new Payload();