From 5d3f5cc4f8e600478cc506de1fe230de4bec6c5a Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 14 Sep 2017 10:26:42 -0400 Subject: [PATCH] Support scrolling in SQL's CLI (elastic/x-pack-elasticsearch#2494) * Move CLI to TransportSqlAction * Moves REST endpoint from `/_cli` to `/_sql/cli` * Removes the special purpose CLI transport action instead implements the CLI entirely on the REST layer, delegating all SQL stuff to the same action that backs the `/_sql` REST API. * Reworks "embedded testing mode" to use a `FilterClient` to bounce capture the sql transport action and execute in embedded. * Switches CLI formatting from consuming the entire response to consuming just the first page of the response and returning a `cursor` that can be used to read the next page. That read is not yet implemented. * Switch CLI formatting from the consuming the `RowSetCursor` to consuming the `SqlResponse` object. * Adds tests for CLI formatting. * Support next page in the cli * Rename cli's CommandRequest/CommandResponse to QueryInitRequest/QueryInitResponse to line up with jdbc * Implement QueryPageRequest/QueryPageResponse in cli * Use `byte[]` to represent the cursor in the cli. Those bytes mean something, but only to the server. The only reasonint that the client does about them is "if length == 0 then there isn't a next page." * Pull common code from jdbc's QueryInitRequest, QueryPageRequest, QueryInitResponse, and QueryPageResponse into the shared-proto project * By implication this switches jdbc's QueryPageRequest to using the same cursor implementation as the cli Original commit: elastic/x-pack-elasticsearch@193586f1eeae0dfa8d835e225b7257161dde30db --- .../security/authz/AuthorizationService.java | 4 +- .../elasticsearch/xpack/sql/CliActionIT.java | 35 ---- .../elasticsearch/xpack/sql/SqlLicenseIT.java | 20 --- .../sql/cli/net/protocol/CommandRequest.java | 55 ------- .../sql/cli/net/protocol/CommandResponse.java | 80 --------- .../xpack/sql/cli/net/protocol/Proto.java | 6 +- .../cli/net/protocol/QueryInitRequest.java | 32 ++++ .../cli/net/protocol/QueryInitResponse.java | 33 ++++ .../cli/net/protocol/QueryPageRequest.java | 28 ++++ .../cli/net/protocol/QueryPageResponse.java | 33 ++++ .../sql/cli/net/protocol/QueryResponse.java | 56 +++++++ .../net/protocol/CliRoundTripTestUtils.java | 7 + .../cli/net/protocol/CommandRequestTests.java | 26 --- .../net/protocol/CommandResponseTests.java | 30 ---- .../cli/net/protocol/ErrorResponseTests.java | 10 +- .../net/protocol/ExceptionResponseTests.java | 10 +- .../net/protocol/QueryInitRequestTests.java | 32 ++++ .../net/protocol/QueryInitResponseTests.java | 30 ++++ .../net/protocol/QueryPageRequestTests.java | 30 ++++ .../net/protocol/QueryPageResponseTests.java | 30 ++++ .../org/elasticsearch/xpack/sql/cli/Cli.java | 134 ++++++++++----- .../xpack/sql/cli/CliHttpClient.java | 62 +++++++ .../sql/cli/{net/client => }/HttpClient.java | 3 +- .../xpack/sql/cli/ResponseToString.java | 6 +- .../sql/cli/net/client/CliHttpClient.java | 56 ------- .../xpack/sql/cli/CliHttpServer.java | 6 +- .../xpack/sql/cli/CliIntegrationTestCase.java | 4 +- .../xpack/sql/cli/CliProtoHandler.java | 32 ++-- .../sql/cli/EmbeddedModeFilterClient.java | 45 ++++++ .../xpack/sql/cli/FetchSizeIT.java | 56 +++++++ .../xpack/sql/cli/ResponseToStringTests.java | 13 +- .../jdbc/net/protocol/QueryInitRequest.java | 57 +------ .../jdbc/net/protocol/QueryInitResponse.java | 38 ++--- .../jdbc/net/protocol/QueryPageRequest.java | 53 +----- .../jdbc/net/protocol/QueryPageResponse.java | 29 ++-- .../net/protocol/JdbcRoundTripTestUtils.java | 7 + .../net/protocol/QueryInitRequestTests.java | 5 +- .../net/protocol/QueryInitResponseTests.java | 8 +- .../net/protocol/QueryPageRequestTests.java | 11 +- .../net/protocol/QueryPageResponseTests.java | 7 +- .../sql/jdbc/net/client/DefaultCursor.java | 15 +- .../sql/jdbc/net/client/JdbcHttpClient.java | 10 +- .../xpack/sql/plugin/CliFormatter.java | 151 +++++++++++++++++ .../xpack/sql/plugin/RestSqlCliAction.java | 153 ++++++++++++++++++ .../xpack/sql/plugin/SqlPlugin.java | 6 +- .../xpack/sql/plugin/cli/CliServer.java | 96 ----------- .../xpack/sql/plugin/cli/CliUtils.java | 122 -------------- .../sql/plugin/cli/action/CliAction.java | 29 ---- .../sql/plugin/cli/action/CliHttpHandler.java | 57 ------- .../sql/plugin/cli/action/CliRequest.java | 118 -------------- .../plugin/cli/action/CliRequestBuilder.java | 28 ---- .../sql/plugin/cli/action/CliResponse.java | 92 ----------- .../plugin/cli/action/TransportCliAction.java | 58 ------- .../xpack/sql/plugin/jdbc/JdbcServer.java | 12 +- .../sql/plugin/sql/action/SqlRequest.java | 16 +- .../sql/plugin/sql/action/SqlResponse.java | 10 +- .../plugin/sql/action/TransportSqlAction.java | 7 + .../sql/plugin/sql/rest/RestSqlAction.java | 2 +- .../xpack/sql/plugin/CliFormatterTests.java | 69 ++++++++ .../plugin/cli/action/CliRequestTests.java | 29 ---- .../plugin/cli/action/CliResponseTests.java | 31 ---- .../sql/protocol/shared/AbstractProto.java | 3 +- .../shared/AbstractQueryInitRequest.java | 74 +++++++++ .../shared/AbstractQueryPageRequest.java | 66 ++++++++ .../shared/AbstractQueryResponse.java | 86 ++++++++++ .../xpack/sql/protocol/shared/ProtoUtil.java | 21 +++ .../sql/protocol/shared}/TimeoutInfo.java | 2 +- .../protocol/shared}/TimeoutInfoTests.java | 2 +- .../xpack/sql/test/server/ProtoHandler.java | 2 + 69 files changed, 1347 insertions(+), 1239 deletions(-) delete mode 100644 plugin/src/test/java/org/elasticsearch/xpack/sql/CliActionIT.java delete mode 100644 sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandRequest.java delete mode 100644 sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandResponse.java create mode 100644 sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitRequest.java create mode 100644 sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponse.java create mode 100644 sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequest.java create mode 100644 sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponse.java create mode 100644 sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryResponse.java delete mode 100644 sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandRequestTests.java delete mode 100644 sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandResponseTests.java create mode 100644 sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitRequestTests.java create mode 100644 sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponseTests.java create mode 100644 sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequestTests.java create mode 100644 sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponseTests.java create mode 100644 sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java rename sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/{net/client => }/HttpClient.java (91%) delete mode 100644 sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/net/client/CliHttpClient.java create mode 100644 sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/EmbeddedModeFilterClient.java create mode 100644 sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/FetchSizeIT.java create mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatter.java create mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java delete mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/CliServer.java delete mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/CliUtils.java delete mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliAction.java delete mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliHttpHandler.java delete mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequest.java delete mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequestBuilder.java delete mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliResponse.java delete mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/TransportCliAction.java create mode 100644 sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/CliFormatterTests.java delete mode 100644 sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequestTests.java delete mode 100644 sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliResponseTests.java create mode 100644 sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryInitRequest.java create mode 100644 sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryPageRequest.java create mode 100644 sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryResponse.java create mode 100644 sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/ProtoUtil.java rename sql/{jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol => shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared}/TimeoutInfo.java (96%) rename sql/{jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol => shared-proto/src/test/java/org/elasticsearch/xpack/sql/protocol/shared}/TimeoutInfoTests.java (93%) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index 18f61544332..57342a40b2a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -64,7 +64,6 @@ import org.elasticsearch.xpack.security.user.AnonymousUser; import org.elasticsearch.xpack.security.user.SystemUser; import org.elasticsearch.xpack.security.user.User; import org.elasticsearch.xpack.security.user.XPackUser; -import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction; import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction; import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction; @@ -485,8 +484,7 @@ public class AuthorizationService extends AbstractComponent { */ private static boolean isDelayedIndicesAction(String action) { return action.equals(SqlAction.NAME) || - action.equals(JdbcAction.NAME) || - action.equals(CliAction.NAME); + action.equals(JdbcAction.NAME); } private static boolean isTranslatedToBulkAction(String action) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/CliActionIT.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/CliActionIT.java deleted file mode 100644 index 2b9ceb4f7e3..00000000000 --- a/plugin/src/test/java/org/elasticsearch/xpack/sql/CliActionIT.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql; - -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest; -import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction; -import org.elasticsearch.xpack.sql.plugin.cli.action.CliResponse; -import org.elasticsearch.xpack.sql.protocol.shared.Request; - -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.containsString; - -public class CliActionIT extends AbstractSqlIntegTestCase { - - public void testCliAction() throws Exception { - assertAcked(client().admin().indices().prepareCreate("test").get()); - client().prepareBulk() - .add(new IndexRequest("test", "doc", "1").source("data", "bar", "count", 42)) - .add(new IndexRequest("test", "doc", "2").source("data", "baz", "count", 43)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .get(); - ensureYellow("test"); - - Request request = new CommandRequest("SELECT * FROM test ORDER BY count"); - - CliResponse response = client().prepareExecute(CliAction.INSTANCE).request(request).get(); - assertThat(response.response(request).toString(), containsString("bar")); - assertThat(response.response(request).toString(), containsString("baz")); - } -} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlLicenseIT.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlLicenseIT.java index dfa9099fe31..e743f7416f0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlLicenseIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlLicenseIT.java @@ -12,11 +12,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.license.AbstractLicensesIntegrationTestCase; import org.elasticsearch.license.License; import org.elasticsearch.license.License.OperationMode; -import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest; import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest; import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse; -import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction; -import org.elasticsearch.xpack.sql.plugin.cli.action.CliResponse; import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction; import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcResponse; import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction; @@ -32,7 +29,6 @@ import static org.elasticsearch.license.XPackLicenseStateTests.randomBasicStanda import static org.elasticsearch.license.XPackLicenseStateTests.randomTrialBasicStandardGoldOrPlatinumMode; import static org.elasticsearch.license.XPackLicenseStateTests.randomTrialOrPlatinumMode; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.IsEqual.equalTo; @@ -103,22 +99,6 @@ public class SqlLicenseIT extends AbstractLicensesIntegrationTestCase { assertThat(response.size(), Matchers.equalTo(2L)); } - public void testCliActionLicense() throws Exception { - setupTestIndex(); - disableSqlLicensing(); - - Request request = new CommandRequest("SELECT * FROM test"); - - ElasticsearchSecurityException e = expectThrows(ElasticsearchSecurityException.class, - () -> client().prepareExecute(CliAction.INSTANCE).request(request).get()); - assertThat(e.getMessage(), equalTo("current license is non-compliant for [sql]")); - enableSqlLicensing(); - - CliResponse response = client().prepareExecute(CliAction.INSTANCE).request(request).get(); - assertThat(response.response(request).toString(), containsString("bar")); - assertThat(response.response(request).toString(), containsString("baz")); - } - public void testJdbcActionLicense() throws Exception { setupTestIndex(); disableJdbcLicensing(); diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandRequest.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandRequest.java deleted file mode 100644 index ae73d068931..00000000000 --- a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandRequest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.cli.net.protocol; - -import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType; -import org.elasticsearch.xpack.sql.protocol.shared.Request; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Objects; - -public class CommandRequest extends Request { - public final String command; - - public CommandRequest(String command) { - this.command = command; - } - - CommandRequest(int clientVersion, DataInput in) throws IOException { - command = in.readUTF(); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(command); - } - - @Override - protected String toStringBody() { - return command; - } - - @Override - public RequestType requestType() { - return RequestType.COMMAND; - } - - @Override - public boolean equals(Object obj) { - if (obj == null || obj.getClass() != getClass()) { - return false; - } - CommandRequest other = (CommandRequest) obj; - return Objects.equals(command, other.command); - } - - @Override - public int hashCode() { - return Objects.hash(command); - } -} diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandResponse.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandResponse.java deleted file mode 100644 index 4b18df390f6..00000000000 --- a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandResponse.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.cli.net.protocol; - -import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType; -import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType; -import org.elasticsearch.xpack.sql.protocol.shared.Request; -import org.elasticsearch.xpack.sql.protocol.shared.Response; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Objects; - -public class CommandResponse extends Response { - public final long serverTimeQueryReceived, serverTimeResponseSent; - public final String requestId; - public final String data; - - public CommandResponse(long serverTimeQueryReceived, long serverTimeResponseSent, String requestId, String data) { - this.serverTimeQueryReceived = serverTimeQueryReceived; - this.serverTimeResponseSent = serverTimeResponseSent; - this.requestId = requestId; - - this.data = data; - } - - CommandResponse(Request request, DataInput in) throws IOException { - serverTimeQueryReceived = in.readLong(); - serverTimeResponseSent = in.readLong(); - requestId = in.readUTF(); - data = in.readUTF(); - } - - @Override - protected void write(int clientVersion, DataOutput out) throws IOException { - out.writeLong(serverTimeQueryReceived); - out.writeLong(serverTimeResponseSent); - out.writeUTF(requestId); - out.writeUTF(data); - } - - @Override - protected String toStringBody() { - return "received=[" + serverTimeQueryReceived - + "] sent=[" + serverTimeResponseSent - + "] requestId=[" + requestId - + "] data=[" + data + "]"; - } - - @Override - public RequestType requestType() { - return RequestType.COMMAND; - } - - @Override - public ResponseType responseType() { - return ResponseType.COMMAND; - } - - @Override - public boolean equals(Object obj) { - if (obj == null || obj.getClass() != getClass()) { - return false; - } - CommandResponse other = (CommandResponse) obj; - return serverTimeQueryReceived == other.serverTimeQueryReceived - && serverTimeResponseSent == other.serverTimeResponseSent - && Objects.equals(requestId, other.requestId) - && Objects.equals(data, other.data); - } - - @Override - public int hashCode() { - return Objects.hash(serverTimeQueryReceived, serverTimeResponseSent, requestId, data); - } -} diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/Proto.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/Proto.java index f8f7804c3c3..dd54c765f8a 100644 --- a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/Proto.java +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/Proto.java @@ -32,7 +32,8 @@ public final class Proto extends AbstractProto { public enum RequestType implements AbstractProto.RequestType { INFO(InfoRequest::new), - COMMAND(CommandRequest::new); + QUERY_INIT(QueryInitRequest::new), + QUERY_PAGE(QueryPageRequest::new); private final RequestReader reader; @@ -64,7 +65,8 @@ public final class Proto extends AbstractProto { EXCEPTION(ExceptionResponse::new), ERROR(ErrorResponse::new), INFO(InfoResponse::new), - COMMAND(CommandResponse::new); + QUERY_INIT(QueryInitResponse::new), + QUERY_PAGE(QueryPageResponse::new); private final ResponseReader reader; diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitRequest.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitRequest.java new file mode 100644 index 00000000000..307178dd6dd --- /dev/null +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitRequest.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli.net.protocol; + +import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType; +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryInitRequest; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; + +import java.io.DataInput; +import java.io.IOException; +import java.util.TimeZone; + +/** + * Request to start a query. + */ +public class QueryInitRequest extends AbstractQueryInitRequest { + public QueryInitRequest(String query, int fetchSize, TimeZone timeZone, TimeoutInfo timeout) { + super(query, fetchSize, timeZone, timeout); + } + + QueryInitRequest(int clientVersion, DataInput in) throws IOException { + super(clientVersion, in); + } + + @Override + public RequestType requestType() { + return RequestType.QUERY_INIT; + } +} diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponse.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponse.java new file mode 100644 index 00000000000..2559365e8a7 --- /dev/null +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponse.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli.net.protocol; + +import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType; +import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType; +import org.elasticsearch.xpack.sql.protocol.shared.Request; + +import java.io.DataInput; +import java.io.IOException; + +public class QueryInitResponse extends QueryResponse { + public QueryInitResponse(long tookNanos, byte[] cursor, String data) { + super(tookNanos, cursor, data); + } + + QueryInitResponse(Request request, DataInput in) throws IOException { + super(request, in); + } + + @Override + public RequestType requestType() { + return RequestType.QUERY_INIT; + } + + @Override + public ResponseType responseType() { + return ResponseType.QUERY_INIT; + } +} diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequest.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequest.java new file mode 100644 index 00000000000..b415f16f07c --- /dev/null +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequest.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli.net.protocol; + +import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType; +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryPageRequest; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; + +import java.io.DataInput; +import java.io.IOException; + +public class QueryPageRequest extends AbstractQueryPageRequest { + public QueryPageRequest(byte[] cursor, TimeoutInfo timeout) { + super(cursor, timeout); + } + + QueryPageRequest(int clientVersion, DataInput in) throws IOException { + super(clientVersion, in); + } + + @Override + public RequestType requestType() { + return RequestType.QUERY_PAGE; + } +} diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponse.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponse.java new file mode 100644 index 00000000000..eaa696218f5 --- /dev/null +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponse.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli.net.protocol; + +import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType; +import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType; +import org.elasticsearch.xpack.sql.protocol.shared.Request; + +import java.io.DataInput; +import java.io.IOException; + +public class QueryPageResponse extends QueryResponse { + public QueryPageResponse(long tookNanos, byte[] cursor, String data) { + super(tookNanos, cursor, data); + } + + QueryPageResponse(Request request, DataInput in) throws IOException { + super(request, in); + } + + @Override + public RequestType requestType() { + return RequestType.QUERY_PAGE; + } + + @Override + public ResponseType responseType() { + return ResponseType.QUERY_PAGE; + } +} diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryResponse.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryResponse.java new file mode 100644 index 00000000000..2a96d98050c --- /dev/null +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryResponse.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli.net.protocol; + +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryResponse; +import org.elasticsearch.xpack.sql.protocol.shared.Request; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Objects; + +public abstract class QueryResponse extends AbstractQueryResponse { + public final String data; + + protected QueryResponse(long tookNanos, byte[] cursor, String data) { + super(tookNanos, cursor); + if (data == null) { + throw new IllegalArgumentException("data cannot be null"); + } + this.data = data; + } + + protected QueryResponse(Request request, DataInput in) throws IOException { + super(request, in); + data = in.readUTF(); + } + + @Override + protected void write(int clientVersion, DataOutput out) throws IOException { + super.write(clientVersion, out); + out.writeUTF(data); + } + + @Override + protected String toStringBody() { + return super.toStringBody() + " data=[" + data + "]"; + } + + @Override + public boolean equals(Object obj) { + if (false == super.equals(obj)) { + return false; + } + QueryResponse other = (QueryResponse) obj; + return data.equals(other.data); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), data); + } +} diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/CliRoundTripTestUtils.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/CliRoundTripTestUtils.java index 40155d3af80..7093f5ab129 100644 --- a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/CliRoundTripTestUtils.java +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/CliRoundTripTestUtils.java @@ -7,10 +7,13 @@ package org.elasticsearch.xpack.sql.cli.net.protocol; import org.elasticsearch.xpack.sql.protocol.shared.Request; import org.elasticsearch.xpack.sql.protocol.shared.Response; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; import org.elasticsearch.xpack.sql.test.RoundTripTestUtils; import java.io.IOException; +import static org.elasticsearch.test.ESTestCase.randomNonNegativeLong; + public final class CliRoundTripTestUtils { private CliRoundTripTestUtils() { // Just static utilities @@ -25,4 +28,8 @@ public final class CliRoundTripTestUtils { (r, out) -> Proto.INSTANCE.writeResponse(r, Proto.CURRENT_VERSION, out), in -> Proto.INSTANCE.readResponse(request, in)); } + + static TimeoutInfo randomTimeoutInfo() { + return new TimeoutInfo(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + } } diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandRequestTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandRequestTests.java deleted file mode 100644 index f61cca083d1..00000000000 --- a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandRequestTests.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.cli.net.protocol; - -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; - -import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion; - -public class CommandRequestTests extends ESTestCase { - static CommandRequest randomCommandRequest() { - return new CommandRequest(randomAlphaOfLength(5)); - } - - public void testRoundTrip() throws IOException { - assertRoundTripCurrentVersion(randomCommandRequest()); - } - - public void testToString() { - assertEquals("CommandRequest", new CommandRequest("test").toString()); - } -} diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandResponseTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandResponseTests.java deleted file mode 100644 index 03209276e68..00000000000 --- a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/CommandResponseTests.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.cli.net.protocol; - -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; - -import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion; -import static org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequestTests.randomCommandRequest; - -public class CommandResponseTests extends ESTestCase { - static CommandResponse randomCommandResponse() { - long start = randomNonNegativeLong(); - long end = randomValueOtherThanMany(l -> l >= start, ESTestCase::randomNonNegativeLong); - return new CommandResponse(start, end, randomAlphaOfLength(5), randomAlphaOfLength(5)); - } - - public void testRoundTrip() throws IOException { - assertRoundTripCurrentVersion(randomCommandRequest(), randomCommandResponse()); - } - - public void testToString() { - assertEquals("CommandResponse", - new CommandResponse(123, 332, "rid", "test").toString()); - } -} diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/ErrorResponseTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/ErrorResponseTests.java index e829e817fb3..ca4c8f6a1af 100644 --- a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/ErrorResponseTests.java +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/ErrorResponseTests.java @@ -11,20 +11,20 @@ import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType; import java.io.IOException; import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion; -import static org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequestTests.randomCommandRequest; +import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequestTests.randomQueryInitRequest; public class ErrorResponseTests extends ESTestCase { static ErrorResponse randomErrorResponse() { - return new ErrorResponse(RequestType.COMMAND, randomAlphaOfLength(5), randomAlphaOfLength(5), randomAlphaOfLength(5)); + return new ErrorResponse(RequestType.QUERY_INIT, randomAlphaOfLength(5), randomAlphaOfLength(5), randomAlphaOfLength(5)); } public void testRoundTrip() throws IOException { - assertRoundTripCurrentVersion(randomCommandRequest(), randomErrorResponse()); + assertRoundTripCurrentVersion(randomQueryInitRequest(), randomErrorResponse()); } public void testToString() { - assertEquals("ErrorResponse", - new ErrorResponse(RequestType.COMMAND, "test", "test", "stack\nstack").toString()); + assertEquals("ErrorResponse", + new ErrorResponse(RequestType.QUERY_INIT, "test", "test", "stack\nstack").toString()); } } diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/ExceptionResponseTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/ExceptionResponseTests.java index 0fd3ec51674..ef28f2c0e01 100644 --- a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/ExceptionResponseTests.java +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/ExceptionResponseTests.java @@ -12,21 +12,21 @@ import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionTyp import java.io.IOException; import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion; -import static org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequestTests.randomCommandRequest; +import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequestTests.randomQueryInitRequest; public class ExceptionResponseTests extends ESTestCase { static ExceptionResponse randomExceptionResponse() { - return new ExceptionResponse(RequestType.COMMAND, randomAlphaOfLength(5), randomAlphaOfLength(5), + return new ExceptionResponse(RequestType.QUERY_INIT, randomAlphaOfLength(5), randomAlphaOfLength(5), randomFrom(SqlExceptionType.values())); } public void testRoundTrip() throws IOException { - assertRoundTripCurrentVersion(randomCommandRequest(), randomExceptionResponse()); + assertRoundTripCurrentVersion(randomQueryInitRequest(), randomExceptionResponse()); } public void testToString() { - assertEquals("ExceptionResponse", - new ExceptionResponse(RequestType.COMMAND, "test", "test", SqlExceptionType.SYNTAX).toString()); + assertEquals("ExceptionResponse", + new ExceptionResponse(RequestType.QUERY_INIT, "test", "test", SqlExceptionType.SYNTAX).toString()); } } diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitRequestTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitRequestTests.java new file mode 100644 index 00000000000..c27eb3f04b8 --- /dev/null +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitRequestTests.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli.net.protocol; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; + +import java.io.IOException; +import java.util.TimeZone; + +import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion; +import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.randomTimeoutInfo; + +public class QueryInitRequestTests extends ESTestCase { + static QueryInitRequest randomQueryInitRequest() { + return new QueryInitRequest(randomAlphaOfLength(5), between(0, Integer.MAX_VALUE), randomTimeZone(random()), randomTimeoutInfo()); + } + + public void testRoundTrip() throws IOException { + assertRoundTripCurrentVersion(randomQueryInitRequest()); + } + + public void testToString() { + assertEquals("QueryInitRequest", + new QueryInitRequest("SELECT * FROM test.doc", 10, TimeZone.getTimeZone("UTC"), new TimeoutInfo(1, 1, 1)).toString()); + assertEquals("QueryInitRequest", + new QueryInitRequest("SELECT * FROM test.doc", 10, TimeZone.getTimeZone("GMT-5"), new TimeoutInfo(1, 1, 1)).toString()); + } +} diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponseTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponseTests.java new file mode 100644 index 00000000000..54d67faa0a6 --- /dev/null +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponseTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli.net.protocol; + +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion; +import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequestTests.randomQueryInitRequest; + +public class QueryInitResponseTests extends ESTestCase { + static QueryInitResponse randomQueryInitResponse() { + byte[] cursor = new byte[between(0, 5)]; + random().nextBytes(cursor); + return new QueryInitResponse(randomNonNegativeLong(), cursor, randomAlphaOfLength(5)); + } + + public void testRoundTrip() throws IOException { + assertRoundTripCurrentVersion(randomQueryInitRequest(), randomQueryInitResponse()); + } + + public void testToString() { + assertEquals("QueryInitResponse", + new QueryInitResponse(123, new byte[] {0x01, 0x03}, "test").toString()); + } +} diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequestTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequestTests.java new file mode 100644 index 00000000000..0dc3112857c --- /dev/null +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequestTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli.net.protocol; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; + +import java.io.IOException; + +import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion; +import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.randomTimeoutInfo; + +public class QueryPageRequestTests extends ESTestCase { + static QueryPageRequest randomQueryPageRequest() { + byte[] cursor = new byte[between(0, 5)]; + random().nextBytes(cursor); + return new QueryPageRequest(cursor, randomTimeoutInfo()); + } + + public void testRoundTrip() throws IOException { + assertRoundTripCurrentVersion(randomQueryPageRequest()); + } + + public void testToString() { + assertEquals("QueryPageRequest<0320>", new QueryPageRequest(new byte[] {0x03, 0x20}, new TimeoutInfo(1, 1, 1)).toString()); + } +} diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponseTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponseTests.java new file mode 100644 index 00000000000..b6e278ff192 --- /dev/null +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponseTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli.net.protocol; + +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion; +import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequestTests.randomQueryPageRequest; + +public class QueryPageResponseTests extends ESTestCase { + static QueryPageResponse randomQueryPageResponse() { + byte[] cursor = new byte[between(0, 5)]; + random().nextBytes(cursor); + return new QueryPageResponse(randomNonNegativeLong(), cursor, randomAlphaOfLength(5)); + } + + public void testRoundTrip() throws IOException { + assertRoundTripCurrentVersion(randomQueryPageRequest(), randomQueryPageResponse()); + } + + public void testToString() { + assertEquals("QueryPageResponse", + new QueryPageResponse(123, new byte[] {0x01, 0x03}, "test").toString()); + } +} diff --git a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java index 69e291147e9..db83f2e63c7 100644 --- a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java +++ b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java @@ -5,9 +5,10 @@ */ package org.elasticsearch.xpack.sql.cli; -import org.elasticsearch.xpack.sql.cli.net.client.CliHttpClient; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryResponse; import org.elasticsearch.xpack.sql.net.client.SuppressForbidden; import org.elasticsearch.xpack.sql.net.client.util.IOUtils; +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryInitRequest; import org.jline.keymap.BindingReader; import org.jline.reader.EndOfFileException; import org.jline.reader.LineReader; @@ -25,6 +26,8 @@ import java.io.PrintWriter; import java.util.Locale; import java.util.Properties; import java.util.logging.LogManager; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.jline.utils.AttributedStyle.BOLD; import static org.jline.utils.AttributedStyle.BRIGHT; @@ -39,7 +42,7 @@ public class Cli { LogManager.getLogManager().readConfiguration(Cli.class.getResourceAsStream("/logging.properties")); try (Terminal term = TerminalBuilder.builder().build()) { try { - Cli console = new Cli(new CliConfiguration("localhost:9200/_cli", new Properties()), term); + Cli console = new Cli(new CliConfiguration("localhost:9200/_sql/cli", new Properties()), term); console.run(); } catch (FatalException e) { term.writer().println(e.getMessage()); @@ -58,9 +61,12 @@ public class Cli { private final Keys keys; private final CliConfiguration cfg; private final CliHttpClient cliClient; + private int fetchSize = AbstractQueryInitRequest.DEFAULT_FETCH_SIZE; + private String fetchSeparator = ""; Cli(CliConfiguration cfg, Terminal terminal) { term = terminal; + // NOCOMMIT figure out if we need to build these for side effects or not. We don't currently use them. bindingReader = new BindingReader(term.reader()); keys = new Keys(term); @@ -85,7 +91,7 @@ public class Cli { prompt = DEFAULT_PROMPT; out.flush(); - printLogo(out); + printLogo(); while (true) { String line = null; @@ -118,9 +124,6 @@ public class Cli { line = multiLine.toString().trim(); multiLine.setLength(0); } - // - // local commands - // // special case to handle exit if (isExit(line)) { @@ -128,23 +131,16 @@ public class Cli { out.flush(); return; } - if (isClear(line)) { - term.puts(Capability.clear_screen); - } - else if (isLogo(line)) { - printLogo(out); - } - - else { + boolean wasLocal = handleLocalCommand(line); + if (false == wasLocal) { try { if (isServerInfo(line)) { - executeServerInfo(out); - } - else { - executeCommand(line, out); + executeServerInfo(); + } else { + executeQuery(line); } } catch (RuntimeException e) { - handleExceptionWhileCommunicatingWithServer(out, e); + handleExceptionWhileCommunicatingWithServer(e); } out.println(); } @@ -157,12 +153,12 @@ public class Cli { * Handle an exception while communication with the server. Extracted * into a method so that tests can bubble the failure. */ - protected void handleExceptionWhileCommunicatingWithServer(PrintWriter out, RuntimeException e) { + protected void handleExceptionWhileCommunicatingWithServer(RuntimeException e) { AttributedStringBuilder asb = new AttributedStringBuilder(); asb.append("Communication error [", BOLD.foreground(RED)); asb.append(e.getMessage(), DEFAULT.boldOff().italic().foreground(YELLOW)); asb.append("]", BOLD.underlineOff().foreground(RED)); - out.println(asb.toAnsi(term)); + term.writer().println(asb.toAnsi(term)); } private static String logo() { @@ -176,38 +172,96 @@ public class Cli { } } - private void printLogo(PrintWriter out) { + private void printLogo() { term.puts(Capability.clear_screen); - out.println(logo()); - out.println(); + term.writer().println(logo()); + term.writer().println(); + } + + private static final Pattern LOGO_PATTERN = Pattern.compile("logo", Pattern.CASE_INSENSITIVE); + private static final Pattern CLEAR_PATTERN = Pattern.compile("cls", Pattern.CASE_INSENSITIVE); + private static final Pattern FETCH_SIZE_PATTERN = Pattern.compile("fetch(?: |_)size *= *(.+)", Pattern.CASE_INSENSITIVE); + private static final Pattern FETCH_SEPARATOR_PATTERN = Pattern.compile("fetch(?: |_)separator *= *\"(.+)\"", Pattern.CASE_INSENSITIVE); + private boolean handleLocalCommand(String line) { + Matcher m = LOGO_PATTERN.matcher(line); + if (m.matches()) { + printLogo(); + return true; + } + m = CLEAR_PATTERN.matcher(line); + if (m.matches()) { + term.puts(Capability.clear_screen); + return true; + } + m = FETCH_SIZE_PATTERN.matcher(line); + if (m.matches()) { + int proposedFetchSize; + try { + proposedFetchSize = fetchSize = Integer.parseInt(m.group(1)); + } catch (NumberFormatException e) { + AttributedStringBuilder asb = new AttributedStringBuilder(); + asb.append("Invalid fetch size [", BOLD.foreground(RED)); + asb.append(m.group(1), DEFAULT.boldOff().italic().foreground(YELLOW)); + asb.append("]", BOLD.underlineOff().foreground(RED)); + term.writer().println(asb.toAnsi(term)); + return true; + } + if (proposedFetchSize <= 0) { + AttributedStringBuilder asb = new AttributedStringBuilder(); + asb.append("Invalid fetch size [", BOLD.foreground(RED)); + asb.append(m.group(1), DEFAULT.boldOff().italic().foreground(YELLOW)); + asb.append("]. Must be > 0.", BOLD.underlineOff().foreground(RED)); + term.writer().println(asb.toAnsi(term)); + return true; + } + this.fetchSize = proposedFetchSize; + AttributedStringBuilder asb = new AttributedStringBuilder(); + asb.append("fetch size set to ", DEFAULT); + asb.append(Integer.toString(fetchSize), DEFAULT.foreground(BRIGHT)); + term.writer().println(asb.toAnsi(term)); + return true; + } + m = FETCH_SEPARATOR_PATTERN.matcher(line); + if (m.matches()) { + fetchSeparator = m.group(1); + AttributedStringBuilder asb = new AttributedStringBuilder(); + asb.append("fetch separator set to \"", DEFAULT); + asb.append(fetchSeparator, DEFAULT.foreground(BRIGHT)); + asb.append("\"", DEFAULT); + term.writer().println(asb.toAnsi(term)); + return true; + } + + return false; } - private static boolean isClear(String line) { - line = line.toLowerCase(Locale.ROOT); - return (line.equals("cls")); - } - private boolean isServerInfo(String line) { line = line.toLowerCase(Locale.ROOT); - return (line.equals("info")); + return line.equals("info"); } - private boolean isLogo(String line) { - line = line.toLowerCase(Locale.ROOT); - return (line.equals("logo")); - } - - private void executeServerInfo(PrintWriter out) { - out.println(ResponseToString.toAnsi(cliClient.serverInfo()).toAnsi(term)); + private void executeServerInfo() { + term.writer().println(ResponseToString.toAnsi(cliClient.serverInfo()).toAnsi(term)); } private static boolean isExit(String line) { line = line.toLowerCase(Locale.ROOT); - return (line.equals("exit") || line.equals("quit")); + return line.equals("exit") || line.equals("quit"); } - protected void executeCommand(String line, PrintWriter out) throws IOException { - out.print(ResponseToString.toAnsi(cliClient.command(line, null)).toAnsi(term)); + private void executeQuery(String line) throws IOException { + QueryResponse response = cliClient.queryInit(line, fetchSize); + while (true) { + term.writer().print(ResponseToString.toAnsi(response).toAnsi(term)); + term.writer().flush(); + if (response.cursor().length == 0) { + return; + } + if (false == fetchSeparator.equals("")) { + term.writer().println(fetchSeparator); + } + response = cliClient.nextPage(response.cursor()); + } } static class FatalException extends RuntimeException { diff --git a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java new file mode 100644 index 00000000000..8bc03f21285 --- /dev/null +++ b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli; + +import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest; +import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse; +import org.elasticsearch.xpack.sql.cli.net.protocol.Proto; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequest; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitResponse; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequest; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageResponse; +import org.elasticsearch.xpack.sql.net.client.util.Bytes; +import org.elasticsearch.xpack.sql.protocol.shared.Request; +import org.elasticsearch.xpack.sql.protocol.shared.Response; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.TimeZone; + +public class CliHttpClient implements AutoCloseable { + private final HttpClient http; + + public CliHttpClient(CliConfiguration cfg) { + http = new HttpClient(cfg); + } + + public InfoResponse serverInfo() { + InfoRequest request = new InfoRequest(); + return (InfoResponse) sendRequest(request); + } + + public QueryInitResponse queryInit(String query, int fetchSize) { + // TODO allow customizing the time zone + // NOCOMMIT figure out Timeouts.... + QueryInitRequest request = new QueryInitRequest(query, fetchSize, TimeZone.getTimeZone("UTC"), new TimeoutInfo(0, 0, 0)); + return (QueryInitResponse) sendRequest(request); + } + + public QueryPageResponse nextPage(byte[] cursor) { + // NOCOMMIT figure out Timeouts.... + QueryPageRequest request = new QueryPageRequest(cursor, new TimeoutInfo(0, 0, 0)); + return (QueryPageResponse) sendRequest(request); + } + + private Response sendRequest(Request request) { + Bytes ba = http.post(out -> Proto.INSTANCE.writeRequest(request, out)); + try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(ba.bytes(), 0, ba.size()))) { + return Proto.INSTANCE.readResponse(request, in); + } catch (IOException ex) { + throw new CliException(ex, "Cannot read response"); + } + } + + public void close() {} +} + + diff --git a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/net/client/HttpClient.java b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/HttpClient.java similarity index 91% rename from sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/net/client/HttpClient.java rename to sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/HttpClient.java index 9109b1773e1..51b8a3c1f0e 100644 --- a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/net/client/HttpClient.java +++ b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/HttpClient.java @@ -3,9 +3,8 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.sql.cli.net.client; +package org.elasticsearch.xpack.sql.cli; -import org.elasticsearch.xpack.sql.cli.CliConfiguration; import org.elasticsearch.xpack.sql.net.client.ClientException; import org.elasticsearch.xpack.sql.net.client.JreHttpUrlConnection; import org.elasticsearch.xpack.sql.net.client.util.Bytes; diff --git a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/ResponseToString.java b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/ResponseToString.java index b5d1572a0ed..0236a5979bb 100644 --- a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/ResponseToString.java +++ b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/ResponseToString.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.xpack.sql.cli; -import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryResponse; import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse; import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse; import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse; @@ -32,8 +32,8 @@ abstract class ResponseToString { static AttributedStringBuilder toAnsi(Response response) { AttributedStringBuilder sb = new AttributedStringBuilder(); - if (response instanceof CommandResponse) { - CommandResponse cmd = (CommandResponse) response; + if (response instanceof QueryResponse) { + QueryResponse cmd = (QueryResponse) response; if (cmd.data != null) { String data = cmd.data.toString(); if (data.startsWith("digraph ")) { diff --git a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/net/client/CliHttpClient.java b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/net/client/CliHttpClient.java deleted file mode 100644 index 07589b5b36e..00000000000 --- a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/net/client/CliHttpClient.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.cli.net.client; - -import org.elasticsearch.xpack.sql.cli.CliConfiguration; -import org.elasticsearch.xpack.sql.cli.CliException; -import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest; -import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest; -import org.elasticsearch.xpack.sql.cli.net.protocol.Proto; -import org.elasticsearch.xpack.sql.net.client.util.Bytes; -import org.elasticsearch.xpack.sql.protocol.shared.Response; - -import java.io.ByteArrayInputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.IOException; - -public class CliHttpClient implements AutoCloseable { - private final HttpClient http; - - public CliHttpClient(CliConfiguration cfg) { - http = new HttpClient(cfg); - } - - public Response serverInfo() { - InfoRequest request = new InfoRequest(); - Bytes ba = http.post(out -> Proto.INSTANCE.writeRequest(request, out)); - return doIO(ba, in -> Proto.INSTANCE.readResponse(request, in)); - } - - public Response command(String command, String requestId) { - CommandRequest request = new CommandRequest(command); - Bytes ba = http.post(out -> Proto.INSTANCE.writeRequest(request, out)); - return doIO(ba, in -> Proto.INSTANCE.readResponse(request, in)); - } - - private static T doIO(Bytes ba, DataInputFunction action) { - try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(ba.bytes(), 0, ba.size()))) { - return action.apply(in); - } catch (IOException ex) { - throw new CliException(ex, "Cannot read response"); - } - } - - public void close() {} - - @FunctionalInterface - private interface DataInputFunction { - R apply(DataInput in) throws IOException; - } -} - - diff --git a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliHttpServer.java b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliHttpServer.java index 11eaf71acfd..8c067aab63a 100644 --- a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliHttpServer.java +++ b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliHttpServer.java @@ -6,15 +6,15 @@ package org.elasticsearch.xpack.sql.cli; import org.elasticsearch.client.Client; -import org.elasticsearch.xpack.sql.protocol.shared.Response; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.xpack.sql.test.server.ProtoHttpServer; /** * Internal server used for testing without starting a new Elasticsearch instance. */ -public class CliHttpServer extends ProtoHttpServer { +public class CliHttpServer extends ProtoHttpServer { public CliHttpServer(Client client) { - super(client, new CliProtoHandler(client), "/_cli"); + super(client, new CliProtoHandler(client), "/_sql/cli"); } @Override diff --git a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliIntegrationTestCase.java b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliIntegrationTestCase.java index 48509f88a5d..196cca54613 100644 --- a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliIntegrationTestCase.java +++ b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliIntegrationTestCase.java @@ -67,7 +67,7 @@ public abstract class CliIntegrationTestCase extends ESRestTestCase { @ClassRule public static final Supplier ES = EMBED_SQL ? new EmbeddedCliServer() : () -> - new CliConfiguration(System.getProperty("tests.rest.cluster") + "/_cli", new Properties()); + new CliConfiguration(System.getProperty("tests.rest.cluster") + "/_sql/cli", new Properties()); protected PrintWriter out; protected BufferedReader in; @@ -93,7 +93,7 @@ public abstract class CliIntegrationTestCase extends ESRestTestCase { terminal.echo(false); Cli cli = new Cli(ES.get(), terminal) { @Override - protected void handleExceptionWhileCommunicatingWithServer(PrintWriter out, RuntimeException e) { + protected void handleExceptionWhileCommunicatingWithServer(RuntimeException e) { throw e; } }; diff --git a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliProtoHandler.java b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliProtoHandler.java index c3cc2001db6..803cae2abec 100644 --- a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliProtoHandler.java +++ b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliProtoHandler.java @@ -8,33 +8,35 @@ package org.elasticsearch.xpack.sql.cli; import com.sun.net.httpserver.HttpExchange; import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.test.rest.FakeRestChannel; +import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.xpack.sql.TestUtils; import org.elasticsearch.xpack.sql.cli.net.protocol.Proto; -import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer; -import org.elasticsearch.xpack.sql.plugin.cli.CliServer; -import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto; -import org.elasticsearch.xpack.sql.protocol.shared.Request; -import org.elasticsearch.xpack.sql.protocol.shared.Response; +import org.elasticsearch.xpack.sql.plugin.RestSqlCliAction; +import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.test.server.ProtoHandler; import java.io.DataInput; import java.io.IOException; -import static org.elasticsearch.action.ActionListener.wrap; +class CliProtoHandler extends ProtoHandler { + private final NamedWriteableRegistry cursorRegistry = new NamedWriteableRegistry(Cursor.getNamedWriteables()); -class CliProtoHandler extends ProtoHandler { - - private final CliServer server; - CliProtoHandler(Client client) { - super(client, response -> AbstractSqlServer.write(AbstractProto.CURRENT_VERSION, response)); - this.server = new CliServer(TestUtils.planExecutor(client), clusterName, - () -> info.getNode().getName(), info.getVersion(), info.getBuild()); + super(new EmbeddedModeFilterClient(client, TestUtils.planExecutor(client)), r -> r); } @Override protected void handle(HttpExchange http, DataInput in) throws IOException { - Request req = Proto.INSTANCE.readRequest(in); - server.handle(req, wrap(resp -> sendHttpResponse(http, resp), ex -> fail(http, ex))); + FakeRestChannel channel = new FakeRestChannel(new FakeRestRequest(), true, 1); + try { + RestSqlCliAction.operation(cursorRegistry, Proto.INSTANCE.readRequest(in), client).accept(channel); + while (false == channel.await()) {} + sendHttpResponse(http, channel.capturedResponse().content()); + } catch (Exception e) { + fail(http, e); + } } } \ No newline at end of file diff --git a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/EmbeddedModeFilterClient.java b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/EmbeddedModeFilterClient.java new file mode 100644 index 00000000000..958cf413d37 --- /dev/null +++ b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/EmbeddedModeFilterClient.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.FilterClient; +import org.elasticsearch.xpack.sql.execution.PlanExecutor; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse; +import org.elasticsearch.xpack.sql.plugin.sql.action.TransportSqlAction; + +/** + * Implements embedded sql mode by intercepting requests to SQL APIs and executing them locally. + */ +public class EmbeddedModeFilterClient extends FilterClient { + private final PlanExecutor planExecutor; + + public EmbeddedModeFilterClient(Client in, PlanExecutor planExecutor) { + super(in); + this.planExecutor = planExecutor; + } + + @Override + @SuppressWarnings("unchecked") + protected < Request extends ActionRequest, + Response extends ActionResponse, + RequestBuilder extends ActionRequestBuilder> + void doExecute(Action action, + Request request, ActionListener listener) { + if (action == SqlAction.INSTANCE) { + TransportSqlAction.operation(planExecutor, (SqlRequest) request, (ActionListener) listener); + } else { + super.doExecute(action, request, listener); + } + } +} diff --git a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/FetchSizeIT.java b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/FetchSizeIT.java new file mode 100644 index 00000000000..01c76a07464 --- /dev/null +++ b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/FetchSizeIT.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli; + +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; + +import java.io.IOException; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.containsString; + +/** + * Test for setting the fetch size. + */ +public class FetchSizeIT extends CliIntegrationTestCase { + public void testSelect() throws IOException { + StringBuilder bulk = new StringBuilder(); + for (int i = 0; i < 20; i++) { + bulk.append("{\"index\":{}}\n"); + bulk.append("{\"test_field\":" + i + "}\n"); + } + client().performRequest("PUT", "/test/doc/_bulk", singletonMap("refresh", "true"), + new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON)); + command("fetch size = 4"); + assertEquals("fetch size set to 4", in.readLine()); + command("fetch separator = \" -- fetch sep -- \""); + assertEquals("fetch separator set to \" -- fetch sep -- \"", in.readLine()); + command("SELECT * FROM test ORDER BY test_field ASC"); + assertThat(in.readLine(), containsString("test_field")); + assertThat(in.readLine(), containsString("----------")); + int i = 0; + while (i < 20) { + assertThat(in.readLine(), containsString(Integer.toString(i++))); + assertThat(in.readLine(), containsString(Integer.toString(i++))); + assertThat(in.readLine(), containsString(Integer.toString(i++))); + assertThat(in.readLine(), containsString(Integer.toString(i++))); + assertEquals(" -- fetch sep -- ", in.readLine()); + } + assertEquals("", in.readLine()); + } + + public void testInvalidFetchSize() throws IOException { + command("fetch size = cat"); + assertEquals("Invalid fetch size [cat]", in.readLine()); + command("fetch size = 0"); + assertEquals("Invalid fetch size [0]. Must be > 0.", in.readLine()); + command("fetch size = -1231"); + assertEquals("Invalid fetch size [-1231]. Must be > 0.", in.readLine()); + command("fetch size = " + Long.MAX_VALUE); + assertEquals("Invalid fetch size [" + Long.MAX_VALUE + "]", in.readLine()); + } +} diff --git a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/ResponseToStringTests.java b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/ResponseToStringTests.java index cd35022fd95..801de5108e9 100644 --- a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/ResponseToStringTests.java +++ b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/ResponseToStringTests.java @@ -6,9 +6,10 @@ package org.elasticsearch.xpack.sql.cli; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse; import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse; import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitResponse; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageResponse; import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType; import org.jline.terminal.Terminal; import org.jline.utils.AttributedStringBuilder; @@ -17,8 +18,14 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class ResponseToStringTests extends ESTestCase { - public void testCommandResponse() { - AttributedStringBuilder s = ResponseToString.toAnsi(new CommandResponse(123, 223, "test", "some command response")); + public void testQueryInitResponse() { + AttributedStringBuilder s = ResponseToString.toAnsi(new QueryInitResponse(123, new byte[0], "some command response")); + assertEquals("some command response", unstyled(s)); + assertEquals("[37msome command response[0m", fullyStyled(s)); + } + + public void testQueryPageResponse() { + AttributedStringBuilder s = ResponseToString.toAnsi(new QueryPageResponse(123, new byte[0], "some command response")); assertEquals("some command response", unstyled(s)); assertEquals("[37msome command response[0m", fullyStyled(s)); } diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitRequest.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitRequest.java index 8cc63dfcfa8..f1b724836a1 100644 --- a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitRequest.java +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitRequest.java @@ -6,71 +6,24 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol; import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType; -import org.elasticsearch.xpack.sql.protocol.shared.Request; +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryInitRequest; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; -import java.util.Objects; import java.util.TimeZone; -public class QueryInitRequest extends Request { - public final String query; - public final int fetchSize; - public final TimeZone timeZone; - public final TimeoutInfo timeout; - +public class QueryInitRequest extends AbstractQueryInitRequest { public QueryInitRequest(String query, int fetchSize, TimeZone timeZone, TimeoutInfo timeout) { - this.query = query; - this.fetchSize = fetchSize; - this.timeZone = timeZone; - this.timeout = timeout; + super(query, fetchSize, timeZone, timeout); } QueryInitRequest(int clientVersion, DataInput in) throws IOException { - query = in.readUTF(); - fetchSize = in.readInt(); - timeZone = TimeZone.getTimeZone(in.readUTF()); - timeout = new TimeoutInfo(in); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(query); - out.writeInt(fetchSize); - out.writeUTF(timeZone.getID()); - timeout.write(out); - } - - @Override - protected String toStringBody() { - StringBuilder b = new StringBuilder(); - b.append("query=[").append(query).append(']'); - if (false == timeZone.getID().equals("UTC")) { - b.append(" timeZone=[").append(timeZone.getID()).append(']'); - } - return b.toString(); + super(clientVersion, in); } @Override public RequestType requestType() { return RequestType.QUERY_INIT; } - - @Override - public boolean equals(Object obj) { - if (obj == null || obj.getClass() != getClass()) { - return false; - } - QueryInitRequest other = (QueryInitRequest) obj; - return fetchSize == other.fetchSize - && Objects.equals(query, other.query) - && Objects.equals(timeout, other.timeout) - && Objects.equals(timeZone.getID(), other.timeZone.getID()); - } - - @Override - public int hashCode() { - return Objects.hash(fetchSize, query, timeout, timeZone.getID().hashCode()); - } } \ No newline at end of file diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponse.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponse.java index 285732ced80..100f94b56db 100644 --- a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponse.java +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponse.java @@ -7,8 +7,8 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol; import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType; import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.ResponseType; +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryResponse; import org.elasticsearch.xpack.sql.protocol.shared.Request; -import org.elasticsearch.xpack.sql.protocol.shared.Response; import java.io.DataInput; import java.io.DataOutput; @@ -19,32 +19,25 @@ import java.util.Objects; import static java.util.Collections.unmodifiableList; -public class QueryInitResponse extends Response { - public final long serverTimeQueryReceived, serverTimeResponseSent; - public final String requestId; +public class QueryInitResponse extends AbstractQueryResponse { public final List columns; public final Payload data; - public QueryInitResponse(long serverTimeQueryReceived, long serverTimeResponseSent, String requestId, List columns, - Payload data) { - this.serverTimeQueryReceived = serverTimeQueryReceived; - this.serverTimeResponseSent = serverTimeResponseSent; - this.requestId = requestId; + public QueryInitResponse(long tookNanos, byte[] cursor, List columns, Payload data) { + super(tookNanos, cursor); this.columns = columns; this.data = data; } QueryInitResponse(Request request, DataInput in) throws IOException { - serverTimeQueryReceived = in.readLong(); - serverTimeResponseSent = in.readLong(); - requestId = in.readUTF(); + super(request, in); int size = in.readInt(); List columns = new ArrayList<>(size); for (int i = 0; i < size; i++) { columns.add(new ColumnInfo(in)); } this.columns = unmodifiableList(columns); - //NOCOMMIT - Page is a client class, it shouldn't leak here + // NOCOMMIT - Page is a client class, it shouldn't leak here Page data = new Page(columns); data.read(in); this.data = data; @@ -52,9 +45,7 @@ public class QueryInitResponse extends Response { @Override public void write(int clientVersion, DataOutput out) throws IOException { - out.writeLong(serverTimeQueryReceived); - out.writeLong(serverTimeResponseSent); - out.writeUTF(requestId); + super.write(clientVersion, out); out.writeInt(columns.size()); for (ColumnInfo c : columns) { c.write(out); @@ -64,10 +55,8 @@ public class QueryInitResponse extends Response { @Override protected String toStringBody() { - return "timeReceived=[" + serverTimeQueryReceived - + "] timeSent=[" + serverTimeResponseSent - + "] requestId=[" + requestId - + "] columns=" + columns + return super.toStringBody() + + " columns=" + columns + " data=[\n" + data + "]"; } @@ -83,19 +72,16 @@ public class QueryInitResponse extends Response { @Override public boolean equals(Object obj) { - if (obj == null || obj.getClass() != getClass()) { + if (false == super.equals(obj)) { return false; } QueryInitResponse other = (QueryInitResponse) obj; - return serverTimeQueryReceived == other.serverTimeQueryReceived - && serverTimeResponseSent == other.serverTimeResponseSent - && requestId.equals(other.requestId) - && columns.equals(other.columns); + return columns.equals(other.columns); // NOCOMMIT data } @Override public int hashCode() { - return Objects.hash(serverTimeQueryReceived, serverTimeResponseSent, requestId, columns); // NOCOMMIT data + return Objects.hash(super.hashCode(), columns); // NOCOMMIT data } } \ No newline at end of file diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequest.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequest.java index 3e52c269097..4064da1ae72 100644 --- a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequest.java +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequest.java @@ -6,71 +6,34 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol; import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType; +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryPageRequest; import org.elasticsearch.xpack.sql.protocol.shared.Nullable; -import org.elasticsearch.xpack.sql.protocol.shared.Request; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; -import java.util.Objects; -public class QueryPageRequest extends Request { - public final String requestId; - public final TimeoutInfo timeout; +public class QueryPageRequest extends AbstractQueryPageRequest { private final transient Payload data; - public QueryPageRequest(String requestId, TimeoutInfo timeout, @Nullable Payload data) { - if (requestId == null) { - throw new IllegalArgumentException("[requestId] must not be null"); - } - if (timeout == null) { - throw new IllegalArgumentException("[timeout] must not be null"); - } - this.requestId = requestId; - this.timeout = timeout; + public QueryPageRequest(byte[] cursor, TimeoutInfo timeout, @Nullable Payload data) { + super(cursor, timeout); this.data = data; } QueryPageRequest(int clientVersion, DataInput in) throws IOException { - this.requestId = in.readUTF(); - this.timeout = new TimeoutInfo(in); - this.data = null; // Data isn't used on the server side - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(requestId); - timeout.write(out); + super(clientVersion, in); + this.data = null; // data isn't used on the server side } public Payload data() { return data; } - @Override - protected String toStringBody() { - return requestId; - } - @Override public RequestType requestType() { return RequestType.QUERY_PAGE; } - @Override - public boolean equals(Object obj) { - if (obj == null || obj.getClass() != getClass()) { - return false; - } - QueryPageRequest other = (QueryPageRequest) obj; - return requestId.equals(other.requestId) - && timeout.equals(other.timeout); - // data is intentionally ignored - } - - @Override - public int hashCode() { - return Objects.hash(requestId, timeout); - // data is intentionally ignored - } + // not overriding hashCode and equals because we're intentionally ignore the data field } diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponse.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponse.java index 563099a4af4..bc43a0b6cb2 100644 --- a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponse.java +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponse.java @@ -7,31 +7,24 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol; import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType; import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.ResponseType; +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryResponse; import org.elasticsearch.xpack.sql.protocol.shared.Request; -import org.elasticsearch.xpack.sql.protocol.shared.Response; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Objects; -public class QueryPageResponse extends Response { - public final String requestId; +public class QueryPageResponse extends AbstractQueryResponse { private final Payload data; - public QueryPageResponse(String requestId, Page data) { - if (requestId == null) { - throw new IllegalArgumentException("[requestId] must not be null"); - } - if (data == null) { - throw new IllegalArgumentException("[data] must not be null"); - } - this.requestId = requestId; + public QueryPageResponse(long tookNanos, byte[] cursor, Payload data) { + super(tookNanos, cursor); this.data = data; } QueryPageResponse(Request request, DataInput in) throws IOException { - this.requestId = in.readUTF(); + super(request, in); QueryPageRequest queryPageRequest = (QueryPageRequest) request; data = queryPageRequest.data(); queryPageRequest.data().read(in); @@ -39,14 +32,13 @@ public class QueryPageResponse extends Response { @Override public void write(int clientVersion, DataOutput out) throws IOException { - out.writeUTF(requestId); + super.write(clientVersion, out); data.write(out); } @Override protected String toStringBody() { - return "requestId=[" + requestId - + "] data=[\n" + data + "]"; + return super.toStringBody() + " data=[\n" + data + "]"; } @Override @@ -61,16 +53,15 @@ public class QueryPageResponse extends Response { @Override public boolean equals(Object obj) { - if (obj == null || obj.getClass() != getClass()) { + if (false == super.equals(obj)) { return false; } QueryPageResponse other = (QueryPageResponse) obj; - return requestId.equals(other.requestId) - && data.equals(other.data); + return data.equals(other.data); } @Override public int hashCode() { - return Objects.hash(requestId, data); + return Objects.hash(data); } } diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/JdbcRoundTripTestUtils.java b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/JdbcRoundTripTestUtils.java index 43bfc20dd71..68765c10526 100644 --- a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/JdbcRoundTripTestUtils.java +++ b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/JdbcRoundTripTestUtils.java @@ -7,11 +7,14 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol; import org.elasticsearch.xpack.sql.protocol.shared.Request; import org.elasticsearch.xpack.sql.protocol.shared.Response; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; import org.elasticsearch.xpack.sql.test.RoundTripTestUtils; import java.io.IOException; import java.util.function.Supplier; +import static org.elasticsearch.test.ESTestCase.randomNonNegativeLong; + public final class JdbcRoundTripTestUtils { private JdbcRoundTripTestUtils() { // Just static utilities @@ -26,4 +29,8 @@ public final class JdbcRoundTripTestUtils { (r, out) -> Proto.INSTANCE.writeResponse(r, Proto.CURRENT_VERSION, out), in -> Proto.INSTANCE.readResponse(request.get(), in)); } + + static TimeoutInfo randomTimeoutInfo() { + return new TimeoutInfo(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + } } diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitRequestTests.java b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitRequestTests.java index 61286e56508..452c24ec7dd 100644 --- a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitRequestTests.java +++ b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitRequestTests.java @@ -6,16 +6,17 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; import java.io.IOException; import java.util.TimeZone; import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.assertRoundTripCurrentVersion; -import static org.elasticsearch.xpack.sql.jdbc.net.protocol.TimeoutInfoTests.randomTimeoutInfo; +import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.randomTimeoutInfo; public class QueryInitRequestTests extends ESTestCase { - public static QueryInitRequest randomQueryInitRequest() { + static QueryInitRequest randomQueryInitRequest() { return new QueryInitRequest(randomAlphaOfLength(5), between(0, Integer.MAX_VALUE), randomTimeZone(random()), randomTimeoutInfo()); } diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponseTests.java b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponseTests.java index d7ff14545d2..f819d2dcd37 100644 --- a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponseTests.java +++ b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponseTests.java @@ -16,8 +16,10 @@ import static org.elasticsearch.xpack.sql.jdbc.net.protocol.PageTests.randomPage public class QueryInitResponseTests extends ESTestCase { static QueryInitResponse randomQueryInitResponse() { + byte[] cursor = new byte[between(0, 5)]; + random().nextBytes(cursor); Page page = randomPage(); - return new QueryInitResponse(randomNonNegativeLong(), randomNonNegativeLong(), randomAlphaOfLength(5), page.columnInfo(), page); + return new QueryInitResponse(randomNonNegativeLong(), cursor, page.columnInfo(), page); } public void testRoundTrip() throws IOException { @@ -29,8 +31,8 @@ public class QueryInitResponseTests extends ESTestCase { new Object[] {"test"}, new Object[] {"string"}, }); - assertEquals("QueryInitResponse] data=[" + assertEquals("QueryInitResponse] data=[" + "\ntest\nstring\n]>", - new QueryInitResponse(123, 456, "test_id", page.columnInfo(), page).toString()); + new QueryInitResponse(123, new byte[] {0x01, 0x20}, page.columnInfo(), page).toString()); } } diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequestTests.java b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequestTests.java index 8dced293faa..0f4a33dbe09 100644 --- a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequestTests.java +++ b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequestTests.java @@ -6,15 +6,18 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; import java.io.IOException; import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.assertRoundTripCurrentVersion; -import static org.elasticsearch.xpack.sql.jdbc.net.protocol.TimeoutInfoTests.randomTimeoutInfo; +import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.randomTimeoutInfo; public class QueryPageRequestTests extends ESTestCase { - public static QueryPageRequest randomQueryPageRequest(Page page) { - return new QueryPageRequest(randomAlphaOfLength(5), randomTimeoutInfo(), page); + static QueryPageRequest randomQueryPageRequest(Page page) { + byte[] cursor = new byte[between(0, 5)]; + random().nextBytes(cursor); + return new QueryPageRequest(cursor, randomTimeoutInfo(), page); } public void testRoundTrip() throws IOException { @@ -22,6 +25,6 @@ public class QueryPageRequestTests extends ESTestCase { } public void testToString() { - assertEquals("QueryPageRequest", new QueryPageRequest("test_id", new TimeoutInfo(1, 1, 1), null).toString()); + assertEquals("QueryPageRequest<0320>", new QueryPageRequest(new byte[] {0x03, 0x20}, new TimeoutInfo(1, 1, 1), null).toString()); } } diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponseTests.java b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponseTests.java index 0968a17252f..538e44c1b97 100644 --- a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponseTests.java +++ b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponseTests.java @@ -17,7 +17,9 @@ import static org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequestTest public class QueryPageResponseTests extends ESTestCase { static QueryPageResponse randomQueryPageResponse(Page page) { - return new QueryPageResponse(randomAlphaOfLength(5), page); + byte[] cursor = new byte[between(0, 5)]; + random().nextBytes(cursor); + return new QueryPageResponse(randomNonNegativeLong(), cursor, page); } public void testRoundTrip() throws IOException { @@ -29,6 +31,7 @@ public class QueryPageResponseTests extends ESTestCase { Page results = new Page(singletonList(varcharInfo("a")), new Object[][] { new Object[] {"test"} }); - assertEquals("QueryPageResponse", new QueryPageResponse("test_id", results).toString()); + assertEquals("QueryPageResponse", + new QueryPageResponse(123, new byte[] {0x08, 0x10}, results).toString()); } } diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java index 678058c13d9..17eb56d54d7 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.sql.jdbc.net.client; import org.elasticsearch.xpack.sql.jdbc.net.protocol.ColumnInfo; import org.elasticsearch.xpack.sql.jdbc.net.protocol.Page; -import org.elasticsearch.xpack.sql.net.client.util.StringUtils; import java.sql.SQLException; import java.util.List; @@ -19,19 +18,15 @@ class DefaultCursor implements Cursor { private final Page page; private int row = -1; - private String requestId; + private byte[] cursor; - DefaultCursor(JdbcHttpClient client, String scrollId, Page page, RequestMeta meta) { + DefaultCursor(JdbcHttpClient client, byte[] cursor, Page page, RequestMeta meta) { this.client = client; this.meta = meta; - this.requestId = simplifyScrollId(scrollId); + this.cursor = cursor; this.page = page; } - private static String simplifyScrollId(String scrollId) { - return StringUtils.hasText(scrollId) ? scrollId : null; - } - @Override public List columns() { return page.columnInfo(); @@ -44,8 +39,8 @@ class DefaultCursor implements Cursor { return true; } else { - if (requestId != null) { - requestId = simplifyScrollId(client.nextPage(requestId, page, meta)); + if (cursor.length != 0) { + cursor = client.nextPage(cursor, page, meta); row = -1; return next(); } diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java index efe4d1de415..75e5b6a5ab4 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java @@ -23,12 +23,12 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitRequest; import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitResponse; import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest; import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageResponse; -import org.elasticsearch.xpack.sql.jdbc.net.protocol.TimeoutInfo; import org.elasticsearch.xpack.sql.jdbc.util.BytesArray; import org.elasticsearch.xpack.sql.jdbc.util.FastByteArrayInputStream; import org.elasticsearch.xpack.sql.net.client.util.StringUtils; import org.elasticsearch.xpack.sql.protocol.shared.Request; import org.elasticsearch.xpack.sql.protocol.shared.Response; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; import java.io.Closeable; import java.io.DataInput; @@ -69,17 +69,17 @@ public class JdbcHttpClient implements Closeable { QueryInitRequest request = new QueryInitRequest(sql, fetch, conCfg.timeZone(), timeout(meta)); BytesArray ba = http.put(out -> Proto.INSTANCE.writeRequest(request, out)); QueryInitResponse response = doIO(ba, in -> (QueryInitResponse) readResponse(request, in)); - return new DefaultCursor(this, response.requestId, (Page) response.data, meta); + return new DefaultCursor(this, response.cursor(), (Page) response.data, meta); } /** * Read the next page of results, updating the {@link Page} and returning * the scroll id to use to fetch the next page. */ - public String nextPage(String requestId, Page page, RequestMeta meta) throws SQLException { - QueryPageRequest request = new QueryPageRequest(requestId, timeout(meta), page); + public byte[] nextPage(byte[] cursor, Page page, RequestMeta meta) throws SQLException { + QueryPageRequest request = new QueryPageRequest(cursor, timeout(meta), page); BytesArray ba = http.put(out -> Proto.INSTANCE.writeRequest(request, out)); - return doIO(ba, in -> ((QueryPageResponse) readResponse(request, in)).requestId); + return doIO(ba, in -> ((QueryPageResponse) readResponse(request, in)).cursor()); } public InfoResponse serverInfo() throws SQLException { diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatter.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatter.java new file mode 100644 index 00000000000..da7dd476766 --- /dev/null +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatter.java @@ -0,0 +1,151 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.plugin; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * Formats {@link SqlResponse} for the CLI. {@linkplain Writeable} so + * that its state can be saved between pages of results. + */ +public class CliFormatter implements Writeable { + /** + * The minimum width for any column in the formatted results. + */ + private static final int MIN_COLUMN_WIDTH = 15; + + private int[] width; + + /** + * Create a new {@linkplain CliFormatter} for formatting responses similar + * to the provided {@link SqlResponse}. + */ + public CliFormatter(SqlResponse response) { + // Figure out the column widths: + // 1. Start with the widths of the column names + width = new int[response.columns().size()]; + for (int i = 0; i < width.length; i++) { + // TODO read the width from the data type? + width[i] = Math.max(MIN_COLUMN_WIDTH, response.columns().get(i).name().length()); + } + + // 2. Expand columns to fit the largest value + for (List row : response.rows()) { + for (int i = 0; i < width.length; i++) { + // NOCOMMIT are we sure toString is correct here? What about dates that come back as longs. + width[i] = Math.max(width[i], Objects.toString(row.get(i)).length()); + } + } + } + + public CliFormatter(StreamInput in) throws IOException { + width = in.readIntArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeIntArray(width); + } + + /** + * Format the provided {@linkplain SqlResponse} for the CLI + * including the header lines. + */ + public String formatWithHeader(SqlResponse response) { + // The header lines + StringBuilder sb = new StringBuilder(estimateSize(response.rows().size() + 2)); + for (int i = 0; i < width.length; i++) { + if (i > 0) { + sb.append('|'); + } + + String name = response.columns().get(i).name(); + // left padding + int leftPadding = (width[i] - name.length()) / 2; + for (int j = 0; j < leftPadding; j++) { + sb.append(' '); + } + sb.append(name); + // right padding + for (int j = 0; j < width[i] - name.length() - leftPadding; j++) { + sb.append(' '); + } + } + sb.append('\n'); + + for (int i = 0; i < width.length; i++) { + if (i > 0) { + sb.append('+'); + } + for (int j = 0; j < width[i]; j++) { + sb.append('-'); // emdash creates issues + } + } + sb.append('\n'); + + + /* Now format the results. Sadly, this means that column + * widths are entirely determined by the first batch of + * results. */ + return formatWithoutHeader(sb, response); + } + + /** + * Format the provided {@linkplain SqlResponse} for the CLI + * without the header lines. + */ + public String formatWithoutHeader(SqlResponse response) { + return formatWithoutHeader(new StringBuilder(estimateSize(response.rows().size())), response).toString(); + } + + private String formatWithoutHeader(StringBuilder sb, SqlResponse response) { + for (List row : response.rows()) { + for (int i = 0; i < width.length; i++) { + if (i > 0) { + sb.append('|'); + } + + // NOCOMMIT are we sure toString is correct here? What about dates that come back as longs. + String string = Objects.toString(row.get(i)); + if (string.length() <= width[i]) { + // Pad + sb.append(string); + int padding = width[i] - string.length(); + for (int p = 0; p < padding; p++) { + sb.append(' '); + } + } else { + // Trim + sb.append(string.substring(0, width[i] - 1)); + sb.append('~'); + } + } + sb.append('\n'); + } + return sb.toString(); + } + + /** + * Pick a good estimate of the buffer size needed to contain the rows. + */ + int estimateSize(int rows) { + /* Each column has either a '|' or a '\n' after it + * so initialize size to number of columns then add + * up the actual widths of each column. */ + int rowWidthEstimate = width.length; + for (int w : width) { + rowWidthEstimate += w; + } + return rowWidthEstimate * rows; + } +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java new file mode 100644 index 00000000000..8b1f6a59ea1 --- /dev/null +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java @@ -0,0 +1,153 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.plugin; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.main.MainAction; +import org.elasticsearch.action.main.MainRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestActionListener; +import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse; +import org.elasticsearch.xpack.sql.cli.net.protocol.Proto; +import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequest; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitResponse; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequest; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageResponse; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest; +import org.elasticsearch.xpack.sql.protocol.shared.Request; +import org.elasticsearch.xpack.sql.protocol.shared.Response; +import org.elasticsearch.xpack.sql.session.Cursor; +import org.joda.time.DateTimeZone; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE; +import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.rest.RestStatus.OK; + +public class RestSqlCliAction extends BaseRestHandler { + private final NamedWriteableRegistry cursorRegistry = new NamedWriteableRegistry(Cursor.getNamedWriteables()); + + public RestSqlCliAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(POST, "/_sql/cli", this); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Request request; + try (DataInputStream in = new DataInputStream(restRequest.content().streamInput())) { + request = Proto.INSTANCE.readRequest(in); + } + Consumer consumer = operation(cursorRegistry, request, client); + return consumer::accept; + } + + /** + * Actual implementation of the operation + */ + public static Consumer operation(NamedWriteableRegistry cursorRegistry, Request request, Client client) + throws IOException { + RequestType requestType = (RequestType) request.requestType(); + switch (requestType) { + case INFO: + return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), toActionListener(channel, response -> + new InfoResponse(response.getNodeName(), response.getClusterName().value(), + response.getVersion().major, response.getVersion().minor, response.getVersion().toString(), + response.getBuild().shortHash(), response.getBuild().date()))); + case QUERY_INIT: + return queryInit(client, (QueryInitRequest) request); + case QUERY_PAGE: + return queryPage(cursorRegistry, client, (QueryPageRequest) request); + default: + throw new IllegalArgumentException("Unsupported action [" + requestType + "]"); + } + } + + private static Consumer queryInit(Client client, QueryInitRequest request) { + // TODO time zone support for CLI + SqlRequest sqlRequest = new SqlRequest(request.query, SqlRequest.DEFAULT_TIME_ZONE, Cursor.EMPTY) + .timeZone(DateTimeZone.forTimeZone(request.timeZone)) + .fetchSize(request.fetchSize); + long start = System.nanoTime(); + return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> { + CliFormatter formatter = new CliFormatter(response); + String data = formatter.formatWithHeader(response); + return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data); + })); + } + + private static Consumer queryPage(NamedWriteableRegistry cursorRegistry, Client client, QueryPageRequest request) { + Cursor cursor; + CliFormatter formatter; + try (StreamInput in = new NamedWriteableAwareStreamInput(new BytesArray(request.cursor).streamInput(), cursorRegistry)) { + cursor = in.readNamedWriteable(Cursor.class); + formatter = new CliFormatter(in); + } catch (IOException e) { + throw new IllegalArgumentException("error reading the cursor"); + } + SqlRequest sqlRequest = new SqlRequest("", SqlRequest.DEFAULT_TIME_ZONE, cursor); + long start = System.nanoTime(); + return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> { + String data = formatter.formatWithoutHeader(response); + return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data); + })); + } + + private static byte[] serializeCursor(Cursor cursor, CliFormatter formatter) { + if (cursor == Cursor.EMPTY) { + return new byte[0]; + } + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeNamedWriteable(cursor); + formatter.writeTo(out); + return BytesRef.deepCopyOf(out.bytes().toBytesRef()).bytes; + } catch (IOException e) { + throw new RuntimeException("unexpected trouble building the cursor", e); + } + } + + private static ActionListener toActionListener(RestChannel channel, Function responseBuilder) { + // NOCOMMIT error response + return new RestActionListener(channel) { + @Override + protected void processResponse(T actionResponse) throws Exception { + Response response = responseBuilder.apply(actionResponse); + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) { + // NOCOMMIT use the version from the client + Proto.INSTANCE.writeResponse(response, Proto.CURRENT_VERSION, dataOutputStream); + } + channel.sendResponse(new BytesRestResponse(OK, TEXT_CONTENT_TYPE, bytesStreamOutput.bytes())); + } + } + }; + } + + @Override + public String getName() { + return "xpack_sql_cli_action"; + } +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java index 1789eeb29ce..699a45be4ba 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java @@ -27,9 +27,6 @@ import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog; import org.elasticsearch.xpack.sql.analysis.catalog.FilteredCatalog; import org.elasticsearch.xpack.sql.execution.PlanExecutor; import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction.TransportAction.CatalogHolder; -import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction; -import org.elasticsearch.xpack.sql.plugin.cli.action.CliHttpHandler; -import org.elasticsearch.xpack.sql.plugin.cli.action.TransportCliAction; import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction; import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcHttpHandler; import org.elasticsearch.xpack.sql.plugin.jdbc.action.TransportJdbcAction; @@ -81,14 +78,13 @@ public class SqlPlugin implements ActionPlugin { IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster) { return Arrays.asList(new RestSqlAction(settings, restController), - new CliHttpHandler(settings, restController), + new RestSqlCliAction(settings, restController), new JdbcHttpHandler(settings, restController)); } @Override public List> getActions() { return Arrays.asList(new ActionHandler<>(SqlAction.INSTANCE, TransportSqlAction.class), - new ActionHandler<>(CliAction.INSTANCE, TransportCliAction.class), new ActionHandler<>(JdbcAction.INSTANCE, TransportJdbcAction.class), new ActionHandler<>(SqlGetIndicesAction.INSTANCE, SqlGetIndicesAction.TransportAction.class)); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/CliServer.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/CliServer.java deleted file mode 100644 index 6dc9b6fcdd8..00000000000 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/CliServer.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.plugin.cli; - -import org.elasticsearch.Build; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest; -import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse; -import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse; -import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse; -import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest; -import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse; -import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType; -import org.elasticsearch.xpack.sql.execution.PlanExecutor; -import org.elasticsearch.xpack.sql.execution.search.SearchHitRowSetCursor; -import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer; -import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType; -import org.elasticsearch.xpack.sql.protocol.shared.Request; -import org.elasticsearch.xpack.sql.protocol.shared.Response; -import org.elasticsearch.xpack.sql.util.StringUtils; - -import java.util.function.Supplier; - -import static org.elasticsearch.action.ActionListener.wrap; -import static org.elasticsearch.xpack.sql.util.StringUtils.EMPTY; - -public class CliServer extends AbstractSqlServer { - - private final PlanExecutor executor; - private final Supplier infoResponse; - - public CliServer(PlanExecutor executor, String clusterName, Supplier nodeName, Version version, Build build) { - this.executor = executor; - // Delay building the response until runtime because the node name is not available at startup - this.infoResponse = () -> new InfoResponse(nodeName.get(), clusterName, version.major, version.minor, version.toString(), - build.shortHash(), build.date()); - } - - @Override - protected void innerHandle(Request req, ActionListener listener) { - RequestType requestType = (RequestType) req.requestType(); - try { - switch (requestType) { - case INFO: - listener.onResponse(info((InfoRequest) req)); - break; - case COMMAND: - command((CommandRequest) req, listener); - break; - default: - throw new IllegalArgumentException("Unsupported action [" + requestType + "]"); - } - } catch (Exception ex) { - listener.onResponse(exceptionResponse(req, ex)); - } - } - - @Override - protected ErrorResponse buildErrorResponse(Request request, String message, String cause, String stack) { - return new ErrorResponse((RequestType) request.requestType(), message, cause, stack); - } - - @Override - protected ExceptionResponse buildExceptionResponse(Request request, String message, String cause, - SqlExceptionType exceptionType) { - return new ExceptionResponse((RequestType) request.requestType(), message, cause, exceptionType); - } - - public InfoResponse info(InfoRequest req) { - return infoResponse.get(); - } - - public void command(CommandRequest req, ActionListener listener) { - final long start = System.currentTimeMillis(); // NOCOMMIT should be nanoTime or else clock skew will skew us - - // NOCOMMIT: need to add settings for CLI - // TODO support non-utc for cli server - executor.sql(req.command, wrap( - c -> { - long stop = System.currentTimeMillis(); - String requestId = EMPTY; - if (c.hasNextSet() && c instanceof SearchHitRowSetCursor) { - requestId = StringUtils.nullAsEmpty(((SearchHitRowSetCursor) c).scrollId()); - } - - // NOCOMMIT it looks like this tries to buffer the entire response in memory before returning it which is going to OOM some po - // NOCOMMIT also this blocks the current thread while it iterates the cursor - listener.onResponse(new CommandResponse(start, stop, requestId, CliUtils.toString(c))); - }, - ex -> listener.onResponse(exceptionResponse(req, ex)))); - } -} \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/CliUtils.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/CliUtils.java deleted file mode 100644 index c66cb1eee52..00000000000 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/CliUtils.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.plugin.cli; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.elasticsearch.xpack.sql.session.RowSetCursor; - -abstract class CliUtils { - - // this toString is a bit convoluted since it tries to be smart and pad the columns according to their values - // as such it will look inside the row, find the max for each column and pad all the values accordingly - // things are more complicated when the value is split across multiple lines (like a plan) in which case - // a row needs to be iterated upon to fill up the values that don't take extra lines - - // Warning: this method _consumes_ a rowset - static String toString(RowSetCursor cursor) { - if (cursor.rowSize() == 1 && cursor.size() == 1 && cursor.column(0).toString().startsWith("digraph ")) { - return cursor.column(0).toString(); - } - - StringBuilder sb = new StringBuilder(); - - // use the schema as a header followed by - // do a best effort to compute the width of the column - int[] width = new int[cursor.rowSize()]; - - for (int i = 0; i < cursor.rowSize(); i++) { - // TODO: default schema width - width[i] = Math.max(15, cursor.schema().names().get(i).length()); - } - - AtomicBoolean firstRun = new AtomicBoolean(true); - // take a look at the first values - cursor.forEachSet(rowSet -> { - for (boolean shouldRun = rowSet.hasCurrentRow(); shouldRun; shouldRun = rowSet.advanceRow()) { - for (int column = 0; column < rowSet.rowSize(); column++) { - if (column > 0) { - sb.append("|"); - } - - String val = String.valueOf(rowSet.column(column)); - // the value might contain multiple lines (plan execution for example) - // TODO: this needs to be improved to properly scale each row across multiple lines - String[] split = val.split("\\n"); - - if (firstRun.get()) { - // find max across splits - for (int splitIndex = 0; splitIndex < split.length; splitIndex++) { - width[column] = Math.max(width[column], split[splitIndex].length()); - } - } - - for (int splitIndex = 0; splitIndex < split.length; splitIndex++) { - if (splitIndex > 0) { - sb.append("\n"); - } - String string = split[splitIndex]; - - // does the value fit the column ? - if (string.length() <= width[column]) { - sb.append(string); - // pad value - for (int k = 0; k < width[column] - string.length(); k++) { - sb.append(" "); - } - } - // no, then trim it - else { - sb.append(string.substring(0, width[column] - 1)); - sb.append("~"); - } - } - } - - sb.append("\n"); - firstRun.set(false); - } - }); - - // compute the header - StringBuilder header = new StringBuilder(); - - for (int i = 0; i < cursor.rowSize(); i++) { - if (i > 0) { - header.append("|"); - } - - String name = cursor.schema().names().get(i); - // left padding - int leftPadding = (width[i] - name.length()) / 2; - for (int j = 0; j < leftPadding; j++) { - header.append(" "); - } - header.append(name); - // right padding - for (int j = 0; j < width[i] - name.length() - leftPadding; j++) { - header.append(" "); - } - } - - header.append("\n"); - for (int i = 0; i < cursor.rowSize(); i++) { - if (i > 0) { - header.append("+"); - } - for (int j = 0; j < width[i]; j++) { - header.append("-"); // emdash creates issues - } - } - - header.append("\n"); - - // append the header - sb.insert(0, header.toString()); - - return sb.toString(); - } -} \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliAction.java deleted file mode 100644 index 57072cb88e5..00000000000 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliAction.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.plugin.cli.action; - -import org.elasticsearch.action.Action; -import org.elasticsearch.client.ElasticsearchClient; - -public class CliAction extends Action { - - public static final CliAction INSTANCE = new CliAction(); - public static final String NAME = "indices:data/read/sql/cli"; - - private CliAction() { - super(NAME); - } - - @Override - public CliRequestBuilder newRequestBuilder(ElasticsearchClient client) { - return new CliRequestBuilder(client, this); - } - - @Override - public CliResponse newResponse() { - return new CliResponse(); - } -} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliHttpHandler.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliHttpHandler.java deleted file mode 100644 index 9aa1959166c..00000000000 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliHttpHandler.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.plugin.cli.action; - -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.BytesRestResponse; -import org.elasticsearch.rest.RestChannel; -import org.elasticsearch.rest.RestController; -import org.elasticsearch.rest.RestRequest; - -import java.io.IOException; - -import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE; -import static org.elasticsearch.rest.RestRequest.Method.POST; -import static org.elasticsearch.rest.RestStatus.OK; - -public class CliHttpHandler extends BaseRestHandler { - - public CliHttpHandler(Settings settings, RestController controller) { - super(settings); - controller.registerHandler(POST, "/_cli", this); - } - - @Override - protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - if (!request.hasContent()) { - throw new IllegalArgumentException("expected a request body"); - } - - CliRequest cliRequest = new CliRequest(request.content()); - return c -> client.executeLocally(CliAction.INSTANCE, cliRequest, - ActionListener.wrap(response -> c.sendResponse(new BytesRestResponse(OK, TEXT_CONTENT_TYPE, response.bytesReference())), - ex -> error(c, ex))); - } - - private static void error(RestChannel channel, Exception ex) { - BytesRestResponse response; - try { - response = new BytesRestResponse(channel, ex); - } catch (IOException e) { - response = new BytesRestResponse(OK, TEXT_CONTENT_TYPE, ExceptionsHelper.stackTrace(e)); - } - channel.sendResponse(response); - } - - @Override - public String getName() { - return "sql_cli_action"; - } -} \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequest.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequest.java deleted file mode 100644 index 91b9f7a6fd4..00000000000 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.plugin.cli.action; - -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.CompositeIndicesRequest; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.xpack.sql.cli.net.protocol.Proto; -import org.elasticsearch.xpack.sql.protocol.shared.Request; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Objects; - -import static org.elasticsearch.action.ValidateActions.addValidationError; - -public class CliRequest extends ActionRequest implements CompositeIndicesRequest { - - private BytesReference bytesReference; - - public CliRequest() { - } - - public CliRequest(Request request) { - try { - request(request); - } catch (IOException ex) { - throw new IllegalArgumentException("cannot serialize the request", ex); - } - } - - public CliRequest(BytesReference bytesReference) { - this.bytesReference = bytesReference; - } - - @Override - public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = null; - if (bytesReference == null) { - validationException = addValidationError("no request has been specified", validationException); - } - return validationException; - } - - /** - * Gets the response object from internally stored serialized version - */ - public Request request() throws IOException { - try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) { - return Proto.INSTANCE.readRequest(in); - } - } - - /** - * Converts the response object into internally stored serialized version - */ - public CliRequest request(Request request) throws IOException { - try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { - try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) { - Proto.INSTANCE.writeRequest(request, dataOutputStream); - } - bytesReference = bytesStreamOutput.bytes(); - } - return this; - } - - public BytesReference bytesReference() { - return bytesReference; - } - - @Override - public int hashCode() { - return Objects.hash(bytesReference); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - - if (obj == null || getClass() != obj.getClass()) { - return false; - } - - CliRequest other = (CliRequest) obj; - return Objects.equals(bytesReference, other.bytesReference); - } - - @Override - public String getDescription() { - try { - return "SQL CLI [" + request() + "]"; - } catch (IOException ex) { - return "SQL CLI [" + ex.getMessage() + "]"; - } - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - bytesReference = in.readBytesReference(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBytesReference(bytesReference); - } -} \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequestBuilder.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequestBuilder.java deleted file mode 100644 index 35ab468ae89..00000000000 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequestBuilder.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.plugin.cli.action; - -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.xpack.sql.protocol.shared.Request; - -import java.io.IOException; - -public class CliRequestBuilder extends ActionRequestBuilder { - - public CliRequestBuilder(ElasticsearchClient client, CliAction action) { - super(client, action, new CliRequest()); - } - - public CliRequestBuilder(ElasticsearchClient client, CliAction action, Request req) { - super(client, action, new CliRequest(req)); - } - - public CliRequestBuilder request(Request req)throws IOException { - request.request(req); - return this; - } -} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliResponse.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliResponse.java deleted file mode 100644 index 975e42b816b..00000000000 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliResponse.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.plugin.cli.action; - -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.xpack.sql.cli.net.protocol.Proto; -import org.elasticsearch.xpack.sql.protocol.shared.Request; -import org.elasticsearch.xpack.sql.protocol.shared.Response; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Objects; - -public class CliResponse extends ActionResponse { - private BytesReference bytesReference; - - public CliResponse() { - } - - public CliResponse(BytesReference bytesReference) { - this.bytesReference = bytesReference; - } - - public CliResponse(Response response) { - try { - response(response); - } catch (IOException ex) { - throw new IllegalArgumentException("cannot serialize the request", ex); - } - } - - /** - * Gets the response object from internally stored serialized version - * - * @param request the request that was used to generate this response - */ - public Response response(Request request) throws IOException { - try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) { - return Proto.INSTANCE.readResponse(request, in); - } - } - - /** - * Serialized the response object into internally stored serialized version - */ - public CliResponse response(Response response) throws IOException { - try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { - try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) { - Proto.INSTANCE.writeResponse(response, Proto.CURRENT_VERSION, dataOutputStream); - } - bytesReference = bytesStreamOutput.bytes(); - } - return this; - } - - public BytesReference bytesReference() { - return bytesReference; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - bytesReference = in.readBytesReference(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBytesReference(bytesReference); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - CliResponse that = (CliResponse) o; - return Objects.equals(bytesReference, that.bytesReference); - } - - @Override - public int hashCode() { - return Objects.hash(bytesReference); - } -} \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/TransportCliAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/TransportCliAction.java deleted file mode 100644 index ec2a6b08d14..00000000000 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/TransportCliAction.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.plugin.cli.action; - -import org.elasticsearch.Build; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.sql.execution.PlanExecutor; -import org.elasticsearch.xpack.sql.plugin.SqlLicenseChecker; -import org.elasticsearch.xpack.sql.plugin.cli.CliServer; -import org.elasticsearch.xpack.sql.protocol.shared.Request; - -import java.io.IOException; - -import static org.elasticsearch.xpack.sql.util.ActionUtils.chain; - -public class TransportCliAction extends HandledTransportAction { - private final CliServer cliServer; - private final SqlLicenseChecker sqlLicenseChecker; - - @Inject - public TransportCliAction(Settings settings, ThreadPool threadPool, - TransportService transportService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService, - PlanExecutor planExecutor, - SqlLicenseChecker sqlLicenseChecker) { - super(settings, CliAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, CliRequest::new); - this.sqlLicenseChecker = sqlLicenseChecker; - this.cliServer = new CliServer(planExecutor, clusterService.getClusterName().value(), - () -> clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT); - } - - @Override - protected void doExecute(CliRequest cliRequest, ActionListener listener) { - sqlLicenseChecker.checkIfSqlAllowed(); - final Request request; - try { - request = cliRequest.request(); - } catch (IOException ex) { - listener.onFailure(ex); - return; - } - // NOCOMMIT we need to pass the protocol version of the client to the response here - cliServer.handle(request, chain(listener, CliResponse::new)); - } -} \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/JdbcServer.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/JdbcServer.java index a0f475f6daf..5367664f275 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/JdbcServer.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/JdbcServer.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex; import org.elasticsearch.xpack.sql.execution.PlanExecutor; -import org.elasticsearch.xpack.sql.execution.search.SearchHitRowSetCursor; import org.elasticsearch.xpack.sql.jdbc.net.protocol.ColumnInfo; import org.elasticsearch.xpack.sql.jdbc.net.protocol.ErrorResponse; import org.elasticsearch.xpack.sql.jdbc.net.protocol.ExceptionResponse; @@ -134,7 +133,7 @@ public class JdbcServer extends AbstractSqlServer { public void queryInit(QueryInitRequest req, ActionListener listener) { - final long start = System.currentTimeMillis(); + final long start = System.nanoTime(); SqlSettings sqlCfg = new SqlSettings(Settings.builder() .put(SqlSettings.PAGE_SIZE, req.fetchSize) @@ -144,17 +143,14 @@ public class JdbcServer extends AbstractSqlServer { //NOCOMMIT: this should be pushed down to the TransportSqlAction to hook up pagination executor.sql(sqlCfg, req.query, wrap(c -> { - long stop = System.currentTimeMillis(); - String requestId = EMPTY; - if (c.hasNextSet() && c instanceof SearchHitRowSetCursor) { - requestId = StringUtils.nullAsEmpty(((SearchHitRowSetCursor) c).scrollId()); - } + long stop = System.nanoTime(); List columnInfo = c.schema().stream() .map(e -> new ColumnInfo(e.name(), e.type().sqlType(), EMPTY, EMPTY, EMPTY, EMPTY)) .collect(toList()); - listener.onResponse(new QueryInitResponse(start, stop, requestId, columnInfo, new RowSetPayload(c))); + // NOCOMMIT paging for jdbc + listener.onResponse(new QueryInitResponse(stop - start, new byte[0], columnInfo, new RowSetPayload(c))); }, ex -> listener.onResponse(exceptionResponse(req, ex)))); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequest.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequest.java index ece0cad5843..551ce3bda99 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequest.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequest.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryInitRequest; import org.elasticsearch.xpack.sql.session.Cursor; import org.joda.time.DateTimeZone; @@ -32,7 +33,7 @@ public class SqlRequest extends ActionRequest implements CompositeIndicesRequest } public static final DateTimeZone DEFAULT_TIME_ZONE = DateTimeZone.UTC; - public static final int DEFAULT_FETCH_SIZE = 1000; + public static final int DEFAULT_FETCH_SIZE = AbstractQueryInitRequest.DEFAULT_FETCH_SIZE; private String query = ""; private DateTimeZone timeZone = DEFAULT_TIME_ZONE; @@ -41,10 +42,19 @@ public class SqlRequest extends ActionRequest implements CompositeIndicesRequest public SqlRequest() {} - public SqlRequest(String query, DateTimeZone timeZone, Cursor nextPageInfo) { + public SqlRequest(String query, DateTimeZone timeZone, Cursor cursor) { + if (query == null) { + throw new IllegalArgumentException("query must not be null"); + } + if (timeZone == null) { + throw new IllegalArgumentException("timeZone must not be null"); + } + if (cursor == null) { + throw new IllegalArgumentException("cursor must not be null"); + } this.query = query; this.timeZone = timeZone; - this.cursor = nextPageInfo; + this.cursor = cursor; } @Override diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlResponse.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlResponse.java index cd75b2b5356..21ab5ad420b 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlResponse.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlResponse.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.sql.plugin.sql.action; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -24,7 +23,6 @@ import java.util.Objects; import static java.util.Collections.unmodifiableList; public class SqlResponse extends ActionResponse implements ToXContentObject { - private Cursor cursor; private long size; private int columnCount; @@ -44,10 +42,9 @@ public class SqlResponse extends ActionResponse implements ToXContentObject { /** * The key that must be sent back to SQL to access the next page of - * results. If {@link BytesReference#length()} is {@code 0} then - * there is no next page. + * results. If equal to {@link Cursor#EMPTY} then there is no next page. */ - public Cursor nextPageInfo() { + public Cursor cursor() { return cursor; } @@ -164,6 +161,9 @@ public class SqlResponse extends ActionResponse implements ToXContentObject { return Strings.toString(this); } + /** + * Information about a column. + */ public static final class ColumnInfo implements Writeable, ToXContentObject { // NOCOMMIT: we probably need to add more info about columns, but that's all we use for now private final String name; diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java index bd1b86afe95..9d4d9f98fd3 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java @@ -45,6 +45,13 @@ public class TransportSqlAction extends HandledTransportAction listener) { sqlLicenseChecker.checkIfSqlAllowed(); + operation(planExecutor, request, listener); + } + + /** + * Actual implementation of the action. Statically available to support embedded mode. + */ + public static void operation(PlanExecutor planExecutor, SqlRequest request, ActionListener listener) { if (request.cursor() == Cursor.EMPTY) { SqlSettings sqlSettings = new SqlSettings(Settings.builder() .put(SqlSettings.PAGE_SIZE, request.fetchSize()) diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java index f924e62fa6e..8de489511b2 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java @@ -41,7 +41,7 @@ public class RestSqlAction extends BaseRestHandler { @Override public String getName() { - return "sql_action"; + return "xpack_sql_action"; } } diff --git a/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/CliFormatterTests.java b/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/CliFormatterTests.java new file mode 100644 index 00000000000..8743296bc81 --- /dev/null +++ b/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/CliFormatterTests.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.plugin; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse.ColumnInfo; +import org.elasticsearch.xpack.sql.session.Cursor; + +import java.util.Arrays; + +import static org.hamcrest.Matchers.arrayWithSize; + +public class CliFormatterTests extends ESTestCase { + private final SqlResponse firstResponse = new SqlResponse(Cursor.EMPTY, 10, 5, + Arrays.asList( + new ColumnInfo("foo", "string"), + new ColumnInfo("bar", "long"), + new ColumnInfo("15charwidename!", "double"), + new ColumnInfo("superduperwidename!!!", "double"), + new ColumnInfo("baz", "keyword")), + Arrays.asList( + Arrays.asList("15charwidedata!", 1, 6.888, 12, "rabbit"), + Arrays.asList("dog", 1.7976931348623157E308, 123124.888, 9912, "goat"))); + private final CliFormatter formatter = new CliFormatter(firstResponse); + + /** + * Tests for {@link CliFormatter#formatWithHeader(SqlResponse)}, values + * of exactly the minimum column size, column names of exactly + * the minimum column size, column headers longer than the + * minimum column size, and values longer than the minimum + * column size. + */ + public void testFormatWithHeader() { + String[] result = formatter.formatWithHeader(firstResponse).split("\n"); + assertThat(result, arrayWithSize(4)); + assertEquals(" foo | bar |15charwidename!|superduperwidename!!!| baz ", result[0]); + assertEquals("---------------+----------------------+---------------+---------------------+---------------", result[1]); + assertEquals("15charwidedata!|1 |6.888 |12 |rabbit ", result[2]); + assertEquals("dog |1.7976931348623157E308|123124.888 |9912 |goat ", result[3]); + } + + /** + * Tests for {@link CliFormatter#formatWithoutHeader(SqlResponse)} and + * truncation of long columns. + */ + public void testFormatWithoutHeader() { + String[] result = formatter.formatWithoutHeader(new SqlResponse(Cursor.EMPTY, 10, 5, null, + Arrays.asList( + Arrays.asList("ohnotruncateddata", 4, 1, 77, "wombat"), + Arrays.asList("dog", 2, 123124.888, 9912, "goat")))).split("\n"); + assertThat(result, arrayWithSize(2)); + assertEquals("ohnotruncatedd~|4 |1 |77 |wombat ", result[0]); + assertEquals("dog |2 |123124.888 |9912 |goat ", result[1]); + } + + /** + * Ensure that our estimates are perfect in at least some cases. + */ + public void testEstimateSize() { + assertEquals(formatter.formatWithHeader(firstResponse).length(), + formatter.estimateSize(firstResponse.rows().size() + 2)); + assertEquals(formatter.formatWithoutHeader(firstResponse).length(), + formatter.estimateSize(firstResponse.rows().size())); + } +} diff --git a/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequestTests.java b/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequestTests.java deleted file mode 100644 index c12f1c7b2e6..00000000000 --- a/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliRequestTests.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.plugin.cli.action; - -import org.elasticsearch.test.AbstractStreamableTestCase; -import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest; -import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest; -import org.elasticsearch.xpack.sql.plugin.cli.action.CliRequest; - -public class CliRequestTests extends AbstractStreamableTestCase { - - @Override - protected CliRequest createTestInstance() { - if (randomBoolean()) { - return new CliRequest(new InfoRequest(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10), - randomAlphaOfLength(10), randomAlphaOfLength(10))); - } else { - return new CliRequest(new CommandRequest(randomAlphaOfLength(10))); - } - } - - @Override - protected CliRequest createBlankInstance() { - return new CliRequest(); - } -} \ No newline at end of file diff --git a/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliResponseTests.java b/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliResponseTests.java deleted file mode 100644 index fb7f8abd855..00000000000 --- a/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/cli/action/CliResponseTests.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.plugin.cli.action; - -import org.elasticsearch.test.AbstractStreamableTestCase; -import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse; -import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoResponse; -import org.elasticsearch.xpack.sql.plugin.cli.action.CliResponse; - -public class CliResponseTests extends AbstractStreamableTestCase { - - @Override - protected CliResponse createTestInstance() { - if (randomBoolean()) { - return new CliResponse(new InfoResponse(randomAlphaOfLength(10), randomAlphaOfLength(10), - randomByte(), randomByte(), - randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10))); - } else { - return new CliResponse(new CommandResponse(randomNonNegativeLong(), randomNonNegativeLong(), - randomAlphaOfLength(10), randomAlphaOfLength(10))); - } - } - - @Override - protected CliResponse createBlankInstance() { - return new CliResponse(); - } -} \ No newline at end of file diff --git a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractProto.java b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractProto.java index bcb83724981..d341d151987 100644 --- a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractProto.java +++ b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractProto.java @@ -38,8 +38,7 @@ public abstract class AbstractProto { public Request readRequest(DataInput in) throws IOException { int clientVersion = readHeader(in); if (clientVersion > CURRENT_VERSION) { - throw new IOException("Unknown client version [" + clientVersion + "]. Always upgrade sql last."); - // NOCOMMIT I believe we usually advise upgrading the clients *first* so this might be backwards..... + throw new IOException("Unknown client version [" + clientVersion + "]. Always upgrade client last."); } return readRequestType(in).reader().read(clientVersion, in); } diff --git a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryInitRequest.java b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryInitRequest.java new file mode 100644 index 00000000000..35ad8999d85 --- /dev/null +++ b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryInitRequest.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.protocol.shared; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Objects; +import java.util.TimeZone; + +public abstract class AbstractQueryInitRequest extends Request { + /** + * Global choice for the default fetch size. + */ + public static final int DEFAULT_FETCH_SIZE = 1000; + + public final String query; + public final int fetchSize; + public final TimeZone timeZone; + public final TimeoutInfo timeout; + + protected AbstractQueryInitRequest(String query, int fetchSize, TimeZone timeZone, TimeoutInfo timeout) { + this.query = query; + this.fetchSize = fetchSize; + this.timeZone = timeZone; + this.timeout = timeout; + } + + protected AbstractQueryInitRequest(int clientVersion, DataInput in) throws IOException { + query = in.readUTF(); + fetchSize = in.readInt(); + timeZone = TimeZone.getTimeZone(in.readUTF()); + timeout = new TimeoutInfo(in); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(query); + out.writeInt(fetchSize); + out.writeUTF(timeZone.getID()); + timeout.write(out); + } + + @Override + protected String toStringBody() { + StringBuilder b = new StringBuilder(); + b.append("query=[").append(query).append(']'); + if (false == timeZone.getID().equals("UTC")) { + b.append(" timeZone=[").append(timeZone.getID()).append(']'); + } + return b.toString(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + AbstractQueryInitRequest other = (AbstractQueryInitRequest) obj; + return fetchSize == other.fetchSize + && Objects.equals(query, other.query) + && Objects.equals(timeout, other.timeout) + && Objects.equals(timeZone.getID(), other.timeZone.getID()); + } + + @Override + public int hashCode() { + return Objects.hash(fetchSize, query, timeout, timeZone.getID().hashCode()); + } + +} diff --git a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryPageRequest.java b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryPageRequest.java new file mode 100644 index 00000000000..e6b12368fd4 --- /dev/null +++ b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryPageRequest.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.protocol.shared; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; + +public abstract class AbstractQueryPageRequest extends Request { + public final byte[] cursor; + public final TimeoutInfo timeout; + + protected AbstractQueryPageRequest(byte[] cursor, TimeoutInfo timeout) { + if (cursor == null) { + throw new IllegalArgumentException("[cursor] must not be null"); + } + if (timeout == null) { + throw new IllegalArgumentException("[timeout] must not be null"); + } + this.cursor = cursor; + this.timeout = timeout; + } + + protected AbstractQueryPageRequest(int clientVersion, DataInput in) throws IOException { + this.cursor = new byte[ProtoUtil.readArraySize(in)]; + in.readFully(cursor); + this.timeout = new TimeoutInfo(in); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(cursor.length); + out.write(cursor); + timeout.write(out); + } + + @Override + protected String toStringBody() { + StringBuilder b = new StringBuilder(); + for (int i = 0; i < cursor.length; i++) { + b.append(String.format(Locale.ROOT, "%02x", cursor[i])); + } + return b.toString(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + AbstractQueryPageRequest other = (AbstractQueryPageRequest) obj; + return Arrays.equals(cursor, other.cursor) + && timeout.equals(other.timeout); + } + + @Override + public int hashCode() { + return Objects.hash(cursor, timeout); + } +} diff --git a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryResponse.java b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryResponse.java new file mode 100644 index 00000000000..9b898832fdf --- /dev/null +++ b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryResponse.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.protocol.shared; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; + +/** + * Superclass for responses both for {@link AbstractQueryInitRequest} + * and {@link AbstractQueryPageRequest}. + */ +public abstract class AbstractQueryResponse extends Response { + private final long tookNanos; + private final byte[] cursor; + + protected AbstractQueryResponse(long tookNanos, byte[] cursor) { + if (cursor == null) { + throw new IllegalArgumentException("cursor must not be null"); + } + this.tookNanos = tookNanos; + this.cursor = cursor; + } + + protected AbstractQueryResponse(Request request, DataInput in) throws IOException { + tookNanos = in.readLong(); + cursor = new byte[ProtoUtil.readArraySize(in)]; + in.readFully(cursor); + } + + @Override + protected void write(int clientVersion, DataOutput out) throws IOException { + out.writeLong(tookNanos); + out.writeInt(cursor.length); + out.write(cursor); + } + + /** + * How long the request took on the server as measured by + * {@link System#nanoTime()}. + */ + public long tookNanos() { + return tookNanos; + } + + /** + * Cursor for fetching the next page. If it has {@code length = 0} + * then there is no next page. + */ + public byte[] cursor() { + return cursor; + } + + @Override + protected String toStringBody() { + StringBuilder b = new StringBuilder(); + b.append("tookNanos=[").append(tookNanos); + b.append("] cursor=["); + for (int i = 0; i < cursor.length; i++) { + b.append(String.format(Locale.ROOT, "%02x", cursor[i])); + } + b.append("]"); + return b.toString(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + AbstractQueryResponse other = (AbstractQueryResponse) obj; + return tookNanos == other.tookNanos + && Arrays.equals(cursor, other.cursor); + } + + @Override + public int hashCode() { + return Objects.hash(tookNanos, Arrays.hashCode(cursor)); + } +} diff --git a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/ProtoUtil.java b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/ProtoUtil.java new file mode 100644 index 00000000000..eb143ee1d8e --- /dev/null +++ b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/ProtoUtil.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.protocol.shared; + +import java.io.DataInput; +import java.io.IOException; + +public class ProtoUtil { + private static final int MAX_ARRAY_SIZE = 5 * 1024 * 1024 * 1024; + + public static int readArraySize(DataInput in) throws IOException { + int length = in.readInt(); + if (length > MAX_ARRAY_SIZE) { + throw new IOException("array size unbelievably long [" + length + "]"); + } + return length; + } +} diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/TimeoutInfo.java b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/TimeoutInfo.java similarity index 96% rename from sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/TimeoutInfo.java rename to sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/TimeoutInfo.java index 1ed1c0c2634..b1004c99b5f 100644 --- a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/TimeoutInfo.java +++ b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/TimeoutInfo.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.sql.jdbc.net.protocol; +package org.elasticsearch.xpack.sql.protocol.shared; import java.io.DataInput; import java.io.DataOutput; diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/TimeoutInfoTests.java b/sql/shared-proto/src/test/java/org/elasticsearch/xpack/sql/protocol/shared/TimeoutInfoTests.java similarity index 93% rename from sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/TimeoutInfoTests.java rename to sql/shared-proto/src/test/java/org/elasticsearch/xpack/sql/protocol/shared/TimeoutInfoTests.java index acc075826db..85a6897b723 100644 --- a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/TimeoutInfoTests.java +++ b/sql/shared-proto/src/test/java/org/elasticsearch/xpack/sql/protocol/shared/TimeoutInfoTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.sql.jdbc.net.protocol; +package org.elasticsearch.xpack.sql.protocol.shared; import org.elasticsearch.test.ESTestCase; diff --git a/sql/test-utils/src/main/java/org/elasticsearch/xpack/sql/test/server/ProtoHandler.java b/sql/test-utils/src/main/java/org/elasticsearch/xpack/sql/test/server/ProtoHandler.java index 62c172e4a4f..1fcf42fdbd5 100644 --- a/sql/test-utils/src/main/java/org/elasticsearch/xpack/sql/test/server/ProtoHandler.java +++ b/sql/test-utils/src/main/java/org/elasticsearch/xpack/sql/test/server/ProtoHandler.java @@ -26,12 +26,14 @@ public abstract class ProtoHandler implements HttpHandler, AutoCloseable { protected static final Logger log = ESLoggerFactory.getLogger(ProtoHandler.class.getName()); private final TimeValue TV = TimeValue.timeValueSeconds(5); + protected final Client client; protected final NodeInfo info; protected final String clusterName; private final CheckedFunction toProto; protected ProtoHandler(Client client, CheckedFunction toProto) { NodesInfoResponse niResponse = client.admin().cluster().prepareNodesInfo("_local").clear().get(TV); + this.client = client; info = niResponse.getNodes().get(0); clusterName = niResponse.getClusterName().value();