Support scrolling in SQL's CLI (elastic/x-pack-elasticsearch#2494)

* Move CLI to TransportSqlAction
    * Moves REST endpoint from `/_cli` to `/_sql/cli`
    * Removes the special purpose CLI transport action instead
    implements the CLI entirely on the REST layer, delegating
    all SQL stuff to the same action that backs the `/_sql` REST
    API.
    * Reworks "embedded testing mode" to use a `FilterClient` to
    bounce capture the sql transport action and execute in embedded.
    * Switches CLI formatting from consuming the entire response
    to consuming just the first page of the response and returning
    a `cursor` that can be used to read the next page. That read is
    not yet implemented.
    * Switch CLI formatting from the consuming the `RowSetCursor` to
    consuming the `SqlResponse` object.
    * Adds tests for CLI formatting.
* Support next page in the cli
    * Rename cli's CommandRequest/CommandResponse to
    QueryInitRequest/QueryInitResponse to line up with jdbc
    * Implement QueryPageRequest/QueryPageResponse in cli
    * Use `byte[]` to represent the cursor in the cli. Those bytes
    mean something, but only to the server. The only reasonint that
    the client does about them is "if length == 0 then there isn't a
    next page."
    * Pull common code from jdbc's QueryInitRequest, QueryPageRequest,
    QueryInitResponse, and QueryPageResponse into the shared-proto
    project
        * By implication this switches jdbc's QueryPageRequest to using
     the same cursor implementation as the cli

Original commit: elastic/x-pack-elasticsearch@193586f1ee
This commit is contained in:
Nik Everett 2017-09-14 10:26:42 -04:00 committed by GitHub
parent 6c0de3689f
commit 5d3f5cc4f8
69 changed files with 1347 additions and 1239 deletions

View File

@ -64,7 +64,6 @@ import org.elasticsearch.xpack.security.user.AnonymousUser;
import org.elasticsearch.xpack.security.user.SystemUser;
import org.elasticsearch.xpack.security.user.User;
import org.elasticsearch.xpack.security.user.XPackUser;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
@ -485,8 +484,7 @@ public class AuthorizationService extends AbstractComponent {
*/
private static boolean isDelayedIndicesAction(String action) {
return action.equals(SqlAction.NAME) ||
action.equals(JdbcAction.NAME) ||
action.equals(CliAction.NAME);
action.equals(JdbcAction.NAME);
}
private static boolean isTranslatedToBulkAction(String action) {

View File

@ -1,35 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
public class CliActionIT extends AbstractSqlIntegTestCase {
public void testCliAction() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test").get());
client().prepareBulk()
.add(new IndexRequest("test", "doc", "1").source("data", "bar", "count", 42))
.add(new IndexRequest("test", "doc", "2").source("data", "baz", "count", 43))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
ensureYellow("test");
Request request = new CommandRequest("SELECT * FROM test ORDER BY count");
CliResponse response = client().prepareExecute(CliAction.INSTANCE).request(request).get();
assertThat(response.response(request).toString(), containsString("bar"));
assertThat(response.response(request).toString(), containsString("baz"));
}
}

View File

@ -12,11 +12,8 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.AbstractLicensesIntegrationTestCase;
import org.elasticsearch.license.License;
import org.elasticsearch.license.License.OperationMode;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliResponse;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcResponse;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
@ -32,7 +29,6 @@ import static org.elasticsearch.license.XPackLicenseStateTests.randomBasicStanda
import static org.elasticsearch.license.XPackLicenseStateTests.randomTrialBasicStandardGoldOrPlatinumMode;
import static org.elasticsearch.license.XPackLicenseStateTests.randomTrialOrPlatinumMode;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.IsEqual.equalTo;
@ -103,22 +99,6 @@ public class SqlLicenseIT extends AbstractLicensesIntegrationTestCase {
assertThat(response.size(), Matchers.equalTo(2L));
}
public void testCliActionLicense() throws Exception {
setupTestIndex();
disableSqlLicensing();
Request request = new CommandRequest("SELECT * FROM test");
ElasticsearchSecurityException e = expectThrows(ElasticsearchSecurityException.class,
() -> client().prepareExecute(CliAction.INSTANCE).request(request).get());
assertThat(e.getMessage(), equalTo("current license is non-compliant for [sql]"));
enableSqlLicensing();
CliResponse response = client().prepareExecute(CliAction.INSTANCE).request(request).get();
assertThat(response.response(request).toString(), containsString("bar"));
assertThat(response.response(request).toString(), containsString("baz"));
}
public void testJdbcActionLicense() throws Exception {
setupTestIndex();
disableJdbcLicensing();

View File

@ -1,55 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public class CommandRequest extends Request {
public final String command;
public CommandRequest(String command) {
this.command = command;
}
CommandRequest(int clientVersion, DataInput in) throws IOException {
command = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(command);
}
@Override
protected String toStringBody() {
return command;
}
@Override
public RequestType requestType() {
return RequestType.COMMAND;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
CommandRequest other = (CommandRequest) obj;
return Objects.equals(command, other.command);
}
@Override
public int hashCode() {
return Objects.hash(command);
}
}

View File

@ -1,80 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public class CommandResponse extends Response {
public final long serverTimeQueryReceived, serverTimeResponseSent;
public final String requestId;
public final String data;
public CommandResponse(long serverTimeQueryReceived, long serverTimeResponseSent, String requestId, String data) {
this.serverTimeQueryReceived = serverTimeQueryReceived;
this.serverTimeResponseSent = serverTimeResponseSent;
this.requestId = requestId;
this.data = data;
}
CommandResponse(Request request, DataInput in) throws IOException {
serverTimeQueryReceived = in.readLong();
serverTimeResponseSent = in.readLong();
requestId = in.readUTF();
data = in.readUTF();
}
@Override
protected void write(int clientVersion, DataOutput out) throws IOException {
out.writeLong(serverTimeQueryReceived);
out.writeLong(serverTimeResponseSent);
out.writeUTF(requestId);
out.writeUTF(data);
}
@Override
protected String toStringBody() {
return "received=[" + serverTimeQueryReceived
+ "] sent=[" + serverTimeResponseSent
+ "] requestId=[" + requestId
+ "] data=[" + data + "]";
}
@Override
public RequestType requestType() {
return RequestType.COMMAND;
}
@Override
public ResponseType responseType() {
return ResponseType.COMMAND;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
CommandResponse other = (CommandResponse) obj;
return serverTimeQueryReceived == other.serverTimeQueryReceived
&& serverTimeResponseSent == other.serverTimeResponseSent
&& Objects.equals(requestId, other.requestId)
&& Objects.equals(data, other.data);
}
@Override
public int hashCode() {
return Objects.hash(serverTimeQueryReceived, serverTimeResponseSent, requestId, data);
}
}

View File

@ -32,7 +32,8 @@ public final class Proto extends AbstractProto {
public enum RequestType implements AbstractProto.RequestType {
INFO(InfoRequest::new),
COMMAND(CommandRequest::new);
QUERY_INIT(QueryInitRequest::new),
QUERY_PAGE(QueryPageRequest::new);
private final RequestReader reader;
@ -64,7 +65,8 @@ public final class Proto extends AbstractProto {
EXCEPTION(ExceptionResponse::new),
ERROR(ErrorResponse::new),
INFO(InfoResponse::new),
COMMAND(CommandResponse::new);
QUERY_INIT(QueryInitResponse::new),
QUERY_PAGE(QueryPageResponse::new);
private final ResponseReader reader;

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryInitRequest;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.DataInput;
import java.io.IOException;
import java.util.TimeZone;
/**
* Request to start a query.
*/
public class QueryInitRequest extends AbstractQueryInitRequest {
public QueryInitRequest(String query, int fetchSize, TimeZone timeZone, TimeoutInfo timeout) {
super(query, fetchSize, timeZone, timeout);
}
QueryInitRequest(int clientVersion, DataInput in) throws IOException {
super(clientVersion, in);
}
@Override
public RequestType requestType() {
return RequestType.QUERY_INIT;
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInput;
import java.io.IOException;
public class QueryInitResponse extends QueryResponse {
public QueryInitResponse(long tookNanos, byte[] cursor, String data) {
super(tookNanos, cursor, data);
}
QueryInitResponse(Request request, DataInput in) throws IOException {
super(request, in);
}
@Override
public RequestType requestType() {
return RequestType.QUERY_INIT;
}
@Override
public ResponseType responseType() {
return ResponseType.QUERY_INIT;
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryPageRequest;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.DataInput;
import java.io.IOException;
public class QueryPageRequest extends AbstractQueryPageRequest {
public QueryPageRequest(byte[] cursor, TimeoutInfo timeout) {
super(cursor, timeout);
}
QueryPageRequest(int clientVersion, DataInput in) throws IOException {
super(clientVersion, in);
}
@Override
public RequestType requestType() {
return RequestType.QUERY_PAGE;
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInput;
import java.io.IOException;
public class QueryPageResponse extends QueryResponse {
public QueryPageResponse(long tookNanos, byte[] cursor, String data) {
super(tookNanos, cursor, data);
}
QueryPageResponse(Request request, DataInput in) throws IOException {
super(request, in);
}
@Override
public RequestType requestType() {
return RequestType.QUERY_PAGE;
}
@Override
public ResponseType responseType() {
return ResponseType.QUERY_PAGE;
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public abstract class QueryResponse extends AbstractQueryResponse {
public final String data;
protected QueryResponse(long tookNanos, byte[] cursor, String data) {
super(tookNanos, cursor);
if (data == null) {
throw new IllegalArgumentException("data cannot be null");
}
this.data = data;
}
protected QueryResponse(Request request, DataInput in) throws IOException {
super(request, in);
data = in.readUTF();
}
@Override
protected void write(int clientVersion, DataOutput out) throws IOException {
super.write(clientVersion, out);
out.writeUTF(data);
}
@Override
protected String toStringBody() {
return super.toStringBody() + " data=[" + data + "]";
}
@Override
public boolean equals(Object obj) {
if (false == super.equals(obj)) {
return false;
}
QueryResponse other = (QueryResponse) obj;
return data.equals(other.data);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), data);
}
}

View File

@ -7,10 +7,13 @@ package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import org.elasticsearch.xpack.sql.test.RoundTripTestUtils;
import java.io.IOException;
import static org.elasticsearch.test.ESTestCase.randomNonNegativeLong;
public final class CliRoundTripTestUtils {
private CliRoundTripTestUtils() {
// Just static utilities
@ -25,4 +28,8 @@ public final class CliRoundTripTestUtils {
(r, out) -> Proto.INSTANCE.writeResponse(r, Proto.CURRENT_VERSION, out),
in -> Proto.INSTANCE.readResponse(request, in));
}
static TimeoutInfo randomTimeoutInfo() {
return new TimeoutInfo(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
}
}

View File

@ -1,26 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
public class CommandRequestTests extends ESTestCase {
static CommandRequest randomCommandRequest() {
return new CommandRequest(randomAlphaOfLength(5));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomCommandRequest());
}
public void testToString() {
assertEquals("CommandRequest<test>", new CommandRequest("test").toString());
}
}

View File

@ -1,30 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequestTests.randomCommandRequest;
public class CommandResponseTests extends ESTestCase {
static CommandResponse randomCommandResponse() {
long start = randomNonNegativeLong();
long end = randomValueOtherThanMany(l -> l >= start, ESTestCase::randomNonNegativeLong);
return new CommandResponse(start, end, randomAlphaOfLength(5), randomAlphaOfLength(5));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomCommandRequest(), randomCommandResponse());
}
public void testToString() {
assertEquals("CommandResponse<received=[123] sent=[332] requestId=[rid] data=[test]>",
new CommandResponse(123, 332, "rid", "test").toString());
}
}

View File

@ -11,20 +11,20 @@ import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequestTests.randomCommandRequest;
import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequestTests.randomQueryInitRequest;
public class ErrorResponseTests extends ESTestCase {
static ErrorResponse randomErrorResponse() {
return new ErrorResponse(RequestType.COMMAND, randomAlphaOfLength(5), randomAlphaOfLength(5), randomAlphaOfLength(5));
return new ErrorResponse(RequestType.QUERY_INIT, randomAlphaOfLength(5), randomAlphaOfLength(5), randomAlphaOfLength(5));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomCommandRequest(), randomErrorResponse());
assertRoundTripCurrentVersion(randomQueryInitRequest(), randomErrorResponse());
}
public void testToString() {
assertEquals("ErrorResponse<request=[COMMAND] message=[test] cause=[test] stack=[stack\nstack]>",
new ErrorResponse(RequestType.COMMAND, "test", "test", "stack\nstack").toString());
assertEquals("ErrorResponse<request=[QUERY_INIT] message=[test] cause=[test] stack=[stack\nstack]>",
new ErrorResponse(RequestType.QUERY_INIT, "test", "test", "stack\nstack").toString());
}
}

View File

@ -12,21 +12,21 @@ import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionTyp
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequestTests.randomCommandRequest;
import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequestTests.randomQueryInitRequest;
public class ExceptionResponseTests extends ESTestCase {
static ExceptionResponse randomExceptionResponse() {
return new ExceptionResponse(RequestType.COMMAND, randomAlphaOfLength(5), randomAlphaOfLength(5),
return new ExceptionResponse(RequestType.QUERY_INIT, randomAlphaOfLength(5), randomAlphaOfLength(5),
randomFrom(SqlExceptionType.values()));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomCommandRequest(), randomExceptionResponse());
assertRoundTripCurrentVersion(randomQueryInitRequest(), randomExceptionResponse());
}
public void testToString() {
assertEquals("ExceptionResponse<request=[COMMAND] message=[test] cause=[test] type=[SYNTAX]>",
new ExceptionResponse(RequestType.COMMAND, "test", "test", SqlExceptionType.SYNTAX).toString());
assertEquals("ExceptionResponse<request=[QUERY_INIT] message=[test] cause=[test] type=[SYNTAX]>",
new ExceptionResponse(RequestType.QUERY_INIT, "test", "test", SqlExceptionType.SYNTAX).toString());
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.IOException;
import java.util.TimeZone;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.randomTimeoutInfo;
public class QueryInitRequestTests extends ESTestCase {
static QueryInitRequest randomQueryInitRequest() {
return new QueryInitRequest(randomAlphaOfLength(5), between(0, Integer.MAX_VALUE), randomTimeZone(random()), randomTimeoutInfo());
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomQueryInitRequest());
}
public void testToString() {
assertEquals("QueryInitRequest<query=[SELECT * FROM test.doc]>",
new QueryInitRequest("SELECT * FROM test.doc", 10, TimeZone.getTimeZone("UTC"), new TimeoutInfo(1, 1, 1)).toString());
assertEquals("QueryInitRequest<query=[SELECT * FROM test.doc] timeZone=[GMT-05:00]>",
new QueryInitRequest("SELECT * FROM test.doc", 10, TimeZone.getTimeZone("GMT-5"), new TimeoutInfo(1, 1, 1)).toString());
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequestTests.randomQueryInitRequest;
public class QueryInitResponseTests extends ESTestCase {
static QueryInitResponse randomQueryInitResponse() {
byte[] cursor = new byte[between(0, 5)];
random().nextBytes(cursor);
return new QueryInitResponse(randomNonNegativeLong(), cursor, randomAlphaOfLength(5));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomQueryInitRequest(), randomQueryInitResponse());
}
public void testToString() {
assertEquals("QueryInitResponse<tookNanos=[123] cursor=[0103] data=[test]>",
new QueryInitResponse(123, new byte[] {0x01, 0x03}, "test").toString());
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.randomTimeoutInfo;
public class QueryPageRequestTests extends ESTestCase {
static QueryPageRequest randomQueryPageRequest() {
byte[] cursor = new byte[between(0, 5)];
random().nextBytes(cursor);
return new QueryPageRequest(cursor, randomTimeoutInfo());
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomQueryPageRequest());
}
public void testToString() {
assertEquals("QueryPageRequest<0320>", new QueryPageRequest(new byte[] {0x03, 0x20}, new TimeoutInfo(1, 1, 1)).toString());
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequestTests.randomQueryPageRequest;
public class QueryPageResponseTests extends ESTestCase {
static QueryPageResponse randomQueryPageResponse() {
byte[] cursor = new byte[between(0, 5)];
random().nextBytes(cursor);
return new QueryPageResponse(randomNonNegativeLong(), cursor, randomAlphaOfLength(5));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomQueryPageRequest(), randomQueryPageResponse());
}
public void testToString() {
assertEquals("QueryPageResponse<tookNanos=[123] cursor=[0103] data=[test]>",
new QueryPageResponse(123, new byte[] {0x01, 0x03}, "test").toString());
}
}

View File

@ -5,9 +5,10 @@
*/
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.xpack.sql.cli.net.client.CliHttpClient;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryResponse;
import org.elasticsearch.xpack.sql.net.client.SuppressForbidden;
import org.elasticsearch.xpack.sql.net.client.util.IOUtils;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryInitRequest;
import org.jline.keymap.BindingReader;
import org.jline.reader.EndOfFileException;
import org.jline.reader.LineReader;
@ -25,6 +26,8 @@ import java.io.PrintWriter;
import java.util.Locale;
import java.util.Properties;
import java.util.logging.LogManager;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.jline.utils.AttributedStyle.BOLD;
import static org.jline.utils.AttributedStyle.BRIGHT;
@ -39,7 +42,7 @@ public class Cli {
LogManager.getLogManager().readConfiguration(Cli.class.getResourceAsStream("/logging.properties"));
try (Terminal term = TerminalBuilder.builder().build()) {
try {
Cli console = new Cli(new CliConfiguration("localhost:9200/_cli", new Properties()), term);
Cli console = new Cli(new CliConfiguration("localhost:9200/_sql/cli", new Properties()), term);
console.run();
} catch (FatalException e) {
term.writer().println(e.getMessage());
@ -58,9 +61,12 @@ public class Cli {
private final Keys keys;
private final CliConfiguration cfg;
private final CliHttpClient cliClient;
private int fetchSize = AbstractQueryInitRequest.DEFAULT_FETCH_SIZE;
private String fetchSeparator = "";
Cli(CliConfiguration cfg, Terminal terminal) {
term = terminal;
// NOCOMMIT figure out if we need to build these for side effects or not. We don't currently use them.
bindingReader = new BindingReader(term.reader());
keys = new Keys(term);
@ -85,7 +91,7 @@ public class Cli {
prompt = DEFAULT_PROMPT;
out.flush();
printLogo(out);
printLogo();
while (true) {
String line = null;
@ -118,9 +124,6 @@ public class Cli {
line = multiLine.toString().trim();
multiLine.setLength(0);
}
//
// local commands
//
// special case to handle exit
if (isExit(line)) {
@ -128,23 +131,16 @@ public class Cli {
out.flush();
return;
}
if (isClear(line)) {
term.puts(Capability.clear_screen);
}
else if (isLogo(line)) {
printLogo(out);
}
else {
boolean wasLocal = handleLocalCommand(line);
if (false == wasLocal) {
try {
if (isServerInfo(line)) {
executeServerInfo(out);
}
else {
executeCommand(line, out);
executeServerInfo();
} else {
executeQuery(line);
}
} catch (RuntimeException e) {
handleExceptionWhileCommunicatingWithServer(out, e);
handleExceptionWhileCommunicatingWithServer(e);
}
out.println();
}
@ -157,12 +153,12 @@ public class Cli {
* Handle an exception while communication with the server. Extracted
* into a method so that tests can bubble the failure.
*/
protected void handleExceptionWhileCommunicatingWithServer(PrintWriter out, RuntimeException e) {
protected void handleExceptionWhileCommunicatingWithServer(RuntimeException e) {
AttributedStringBuilder asb = new AttributedStringBuilder();
asb.append("Communication error [", BOLD.foreground(RED));
asb.append(e.getMessage(), DEFAULT.boldOff().italic().foreground(YELLOW));
asb.append("]", BOLD.underlineOff().foreground(RED));
out.println(asb.toAnsi(term));
term.writer().println(asb.toAnsi(term));
}
private static String logo() {
@ -176,38 +172,96 @@ public class Cli {
}
}
private void printLogo(PrintWriter out) {
private void printLogo() {
term.puts(Capability.clear_screen);
out.println(logo());
out.println();
term.writer().println(logo());
term.writer().println();
}
private static final Pattern LOGO_PATTERN = Pattern.compile("logo", Pattern.CASE_INSENSITIVE);
private static final Pattern CLEAR_PATTERN = Pattern.compile("cls", Pattern.CASE_INSENSITIVE);
private static final Pattern FETCH_SIZE_PATTERN = Pattern.compile("fetch(?: |_)size *= *(.+)", Pattern.CASE_INSENSITIVE);
private static final Pattern FETCH_SEPARATOR_PATTERN = Pattern.compile("fetch(?: |_)separator *= *\"(.+)\"", Pattern.CASE_INSENSITIVE);
private boolean handleLocalCommand(String line) {
Matcher m = LOGO_PATTERN.matcher(line);
if (m.matches()) {
printLogo();
return true;
}
m = CLEAR_PATTERN.matcher(line);
if (m.matches()) {
term.puts(Capability.clear_screen);
return true;
}
m = FETCH_SIZE_PATTERN.matcher(line);
if (m.matches()) {
int proposedFetchSize;
try {
proposedFetchSize = fetchSize = Integer.parseInt(m.group(1));
} catch (NumberFormatException e) {
AttributedStringBuilder asb = new AttributedStringBuilder();
asb.append("Invalid fetch size [", BOLD.foreground(RED));
asb.append(m.group(1), DEFAULT.boldOff().italic().foreground(YELLOW));
asb.append("]", BOLD.underlineOff().foreground(RED));
term.writer().println(asb.toAnsi(term));
return true;
}
if (proposedFetchSize <= 0) {
AttributedStringBuilder asb = new AttributedStringBuilder();
asb.append("Invalid fetch size [", BOLD.foreground(RED));
asb.append(m.group(1), DEFAULT.boldOff().italic().foreground(YELLOW));
asb.append("]. Must be > 0.", BOLD.underlineOff().foreground(RED));
term.writer().println(asb.toAnsi(term));
return true;
}
this.fetchSize = proposedFetchSize;
AttributedStringBuilder asb = new AttributedStringBuilder();
asb.append("fetch size set to ", DEFAULT);
asb.append(Integer.toString(fetchSize), DEFAULT.foreground(BRIGHT));
term.writer().println(asb.toAnsi(term));
return true;
}
m = FETCH_SEPARATOR_PATTERN.matcher(line);
if (m.matches()) {
fetchSeparator = m.group(1);
AttributedStringBuilder asb = new AttributedStringBuilder();
asb.append("fetch separator set to \"", DEFAULT);
asb.append(fetchSeparator, DEFAULT.foreground(BRIGHT));
asb.append("\"", DEFAULT);
term.writer().println(asb.toAnsi(term));
return true;
}
return false;
}
private static boolean isClear(String line) {
line = line.toLowerCase(Locale.ROOT);
return (line.equals("cls"));
}
private boolean isServerInfo(String line) {
line = line.toLowerCase(Locale.ROOT);
return (line.equals("info"));
return line.equals("info");
}
private boolean isLogo(String line) {
line = line.toLowerCase(Locale.ROOT);
return (line.equals("logo"));
}
private void executeServerInfo(PrintWriter out) {
out.println(ResponseToString.toAnsi(cliClient.serverInfo()).toAnsi(term));
private void executeServerInfo() {
term.writer().println(ResponseToString.toAnsi(cliClient.serverInfo()).toAnsi(term));
}
private static boolean isExit(String line) {
line = line.toLowerCase(Locale.ROOT);
return (line.equals("exit") || line.equals("quit"));
return line.equals("exit") || line.equals("quit");
}
protected void executeCommand(String line, PrintWriter out) throws IOException {
out.print(ResponseToString.toAnsi(cliClient.command(line, null)).toAnsi(term));
private void executeQuery(String line) throws IOException {
QueryResponse response = cliClient.queryInit(line, fetchSize);
while (true) {
term.writer().print(ResponseToString.toAnsi(response).toAnsi(term));
term.writer().flush();
if (response.cursor().length == 0) {
return;
}
if (false == fetchSeparator.equals("")) {
term.writer().println(fetchSeparator);
}
response = cliClient.nextPage(response.cursor());
}
}
static class FatalException extends RuntimeException {

View File

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageResponse;
import org.elasticsearch.xpack.sql.net.client.util.Bytes;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.TimeZone;
public class CliHttpClient implements AutoCloseable {
private final HttpClient http;
public CliHttpClient(CliConfiguration cfg) {
http = new HttpClient(cfg);
}
public InfoResponse serverInfo() {
InfoRequest request = new InfoRequest();
return (InfoResponse) sendRequest(request);
}
public QueryInitResponse queryInit(String query, int fetchSize) {
// TODO allow customizing the time zone
// NOCOMMIT figure out Timeouts....
QueryInitRequest request = new QueryInitRequest(query, fetchSize, TimeZone.getTimeZone("UTC"), new TimeoutInfo(0, 0, 0));
return (QueryInitResponse) sendRequest(request);
}
public QueryPageResponse nextPage(byte[] cursor) {
// NOCOMMIT figure out Timeouts....
QueryPageRequest request = new QueryPageRequest(cursor, new TimeoutInfo(0, 0, 0));
return (QueryPageResponse) sendRequest(request);
}
private Response sendRequest(Request request) {
Bytes ba = http.post(out -> Proto.INSTANCE.writeRequest(request, out));
try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(ba.bytes(), 0, ba.size()))) {
return Proto.INSTANCE.readResponse(request, in);
} catch (IOException ex) {
throw new CliException(ex, "Cannot read response");
}
}
public void close() {}
}

View File

@ -3,9 +3,8 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.client;
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.xpack.sql.cli.CliConfiguration;
import org.elasticsearch.xpack.sql.net.client.ClientException;
import org.elasticsearch.xpack.sql.net.client.JreHttpUrlConnection;
import org.elasticsearch.xpack.sql.net.client.util.Bytes;

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
@ -32,8 +32,8 @@ abstract class ResponseToString {
static AttributedStringBuilder toAnsi(Response response) {
AttributedStringBuilder sb = new AttributedStringBuilder();
if (response instanceof CommandResponse) {
CommandResponse cmd = (CommandResponse) response;
if (response instanceof QueryResponse) {
QueryResponse cmd = (QueryResponse) response;
if (cmd.data != null) {
String data = cmd.data.toString();
if (data.startsWith("digraph ")) {

View File

@ -1,56 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.client;
import org.elasticsearch.xpack.sql.cli.CliConfiguration;
import org.elasticsearch.xpack.sql.cli.CliException;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.net.client.util.Bytes;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
public class CliHttpClient implements AutoCloseable {
private final HttpClient http;
public CliHttpClient(CliConfiguration cfg) {
http = new HttpClient(cfg);
}
public Response serverInfo() {
InfoRequest request = new InfoRequest();
Bytes ba = http.post(out -> Proto.INSTANCE.writeRequest(request, out));
return doIO(ba, in -> Proto.INSTANCE.readResponse(request, in));
}
public Response command(String command, String requestId) {
CommandRequest request = new CommandRequest(command);
Bytes ba = http.post(out -> Proto.INSTANCE.writeRequest(request, out));
return doIO(ba, in -> Proto.INSTANCE.readResponse(request, in));
}
private static <T> T doIO(Bytes ba, DataInputFunction<T> action) {
try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(ba.bytes(), 0, ba.size()))) {
return action.apply(in);
} catch (IOException ex) {
throw new CliException(ex, "Cannot read response");
}
}
public void close() {}
@FunctionalInterface
private interface DataInputFunction<R> {
R apply(DataInput in) throws IOException;
}
}

View File

@ -6,15 +6,15 @@
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.xpack.sql.test.server.ProtoHttpServer;
/**
* Internal server used for testing without starting a new Elasticsearch instance.
*/
public class CliHttpServer extends ProtoHttpServer<Response> {
public class CliHttpServer extends ProtoHttpServer<BytesReference> {
public CliHttpServer(Client client) {
super(client, new CliProtoHandler(client), "/_cli");
super(client, new CliProtoHandler(client), "/_sql/cli");
}
@Override

View File

@ -67,7 +67,7 @@ public abstract class CliIntegrationTestCase extends ESRestTestCase {
@ClassRule
public static final Supplier<CliConfiguration> ES = EMBED_SQL ? new EmbeddedCliServer() : () ->
new CliConfiguration(System.getProperty("tests.rest.cluster") + "/_cli", new Properties());
new CliConfiguration(System.getProperty("tests.rest.cluster") + "/_sql/cli", new Properties());
protected PrintWriter out;
protected BufferedReader in;
@ -93,7 +93,7 @@ public abstract class CliIntegrationTestCase extends ESRestTestCase {
terminal.echo(false);
Cli cli = new Cli(ES.get(), terminal) {
@Override
protected void handleExceptionWhileCommunicatingWithServer(PrintWriter out, RuntimeException e) {
protected void handleExceptionWhileCommunicatingWithServer(RuntimeException e) {
throw e;
}
};

View File

@ -8,33 +8,35 @@ package org.elasticsearch.xpack.sql.cli;
import com.sun.net.httpserver.HttpExchange;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.rest.FakeRestChannel;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.xpack.sql.TestUtils;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer;
import org.elasticsearch.xpack.sql.plugin.cli.CliServer;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.plugin.RestSqlCliAction;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.test.server.ProtoHandler;
import java.io.DataInput;
import java.io.IOException;
import static org.elasticsearch.action.ActionListener.wrap;
class CliProtoHandler extends ProtoHandler<BytesReference> {
private final NamedWriteableRegistry cursorRegistry = new NamedWriteableRegistry(Cursor.getNamedWriteables());
class CliProtoHandler extends ProtoHandler<Response> {
private final CliServer server;
CliProtoHandler(Client client) {
super(client, response -> AbstractSqlServer.write(AbstractProto.CURRENT_VERSION, response));
this.server = new CliServer(TestUtils.planExecutor(client), clusterName,
() -> info.getNode().getName(), info.getVersion(), info.getBuild());
super(new EmbeddedModeFilterClient(client, TestUtils.planExecutor(client)), r -> r);
}
@Override
protected void handle(HttpExchange http, DataInput in) throws IOException {
Request req = Proto.INSTANCE.readRequest(in);
server.handle(req, wrap(resp -> sendHttpResponse(http, resp), ex -> fail(http, ex)));
FakeRestChannel channel = new FakeRestChannel(new FakeRestRequest(), true, 1);
try {
RestSqlCliAction.operation(cursorRegistry, Proto.INSTANCE.readRequest(in), client).accept(channel);
while (false == channel.await()) {}
sendHttpResponse(http, channel.capturedResponse().content());
} catch (Exception e) {
fail(http, e);
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import org.elasticsearch.xpack.sql.plugin.sql.action.TransportSqlAction;
/**
* Implements embedded sql mode by intercepting requests to SQL APIs and executing them locally.
*/
public class EmbeddedModeFilterClient extends FilterClient {
private final PlanExecutor planExecutor;
public EmbeddedModeFilterClient(Client in, PlanExecutor planExecutor) {
super(in);
this.planExecutor = planExecutor;
}
@Override
@SuppressWarnings("unchecked")
protected < Request extends ActionRequest,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>>
void doExecute(Action<Request, Response, RequestBuilder> action,
Request request, ActionListener<Response> listener) {
if (action == SqlAction.INSTANCE) {
TransportSqlAction.operation(planExecutor, (SqlRequest) request, (ActionListener<SqlResponse>) listener);
} else {
super.doExecute(action, request, listener);
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import java.io.IOException;
import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.containsString;
/**
* Test for setting the fetch size.
*/
public class FetchSizeIT extends CliIntegrationTestCase {
public void testSelect() throws IOException {
StringBuilder bulk = new StringBuilder();
for (int i = 0; i < 20; i++) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"test_field\":" + i + "}\n");
}
client().performRequest("PUT", "/test/doc/_bulk", singletonMap("refresh", "true"),
new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON));
command("fetch size = 4");
assertEquals("fetch size set to 4", in.readLine());
command("fetch separator = \" -- fetch sep -- \"");
assertEquals("fetch separator set to \" -- fetch sep -- \"", in.readLine());
command("SELECT * FROM test ORDER BY test_field ASC");
assertThat(in.readLine(), containsString("test_field"));
assertThat(in.readLine(), containsString("----------"));
int i = 0;
while (i < 20) {
assertThat(in.readLine(), containsString(Integer.toString(i++)));
assertThat(in.readLine(), containsString(Integer.toString(i++)));
assertThat(in.readLine(), containsString(Integer.toString(i++)));
assertThat(in.readLine(), containsString(Integer.toString(i++)));
assertEquals(" -- fetch sep -- ", in.readLine());
}
assertEquals("", in.readLine());
}
public void testInvalidFetchSize() throws IOException {
command("fetch size = cat");
assertEquals("Invalid fetch size [cat]", in.readLine());
command("fetch size = 0");
assertEquals("Invalid fetch size [0]. Must be > 0.", in.readLine());
command("fetch size = -1231");
assertEquals("Invalid fetch size [-1231]. Must be > 0.", in.readLine());
command("fetch size = " + Long.MAX_VALUE);
assertEquals("Invalid fetch size [" + Long.MAX_VALUE + "]", in.readLine());
}
}

View File

@ -6,9 +6,10 @@
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageResponse;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.jline.terminal.Terminal;
import org.jline.utils.AttributedStringBuilder;
@ -17,8 +18,14 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ResponseToStringTests extends ESTestCase {
public void testCommandResponse() {
AttributedStringBuilder s = ResponseToString.toAnsi(new CommandResponse(123, 223, "test", "some command response"));
public void testQueryInitResponse() {
AttributedStringBuilder s = ResponseToString.toAnsi(new QueryInitResponse(123, new byte[0], "some command response"));
assertEquals("some command response", unstyled(s));
assertEquals("[37msome command response[0m", fullyStyled(s));
}
public void testQueryPageResponse() {
AttributedStringBuilder s = ResponseToString.toAnsi(new QueryPageResponse(123, new byte[0], "some command response"));
assertEquals("some command response", unstyled(s));
assertEquals("[37msome command response[0m", fullyStyled(s));
}

View File

@ -6,71 +6,24 @@
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryInitRequest;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
import java.util.TimeZone;
public class QueryInitRequest extends Request {
public final String query;
public final int fetchSize;
public final TimeZone timeZone;
public final TimeoutInfo timeout;
public class QueryInitRequest extends AbstractQueryInitRequest {
public QueryInitRequest(String query, int fetchSize, TimeZone timeZone, TimeoutInfo timeout) {
this.query = query;
this.fetchSize = fetchSize;
this.timeZone = timeZone;
this.timeout = timeout;
super(query, fetchSize, timeZone, timeout);
}
QueryInitRequest(int clientVersion, DataInput in) throws IOException {
query = in.readUTF();
fetchSize = in.readInt();
timeZone = TimeZone.getTimeZone(in.readUTF());
timeout = new TimeoutInfo(in);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(query);
out.writeInt(fetchSize);
out.writeUTF(timeZone.getID());
timeout.write(out);
}
@Override
protected String toStringBody() {
StringBuilder b = new StringBuilder();
b.append("query=[").append(query).append(']');
if (false == timeZone.getID().equals("UTC")) {
b.append(" timeZone=[").append(timeZone.getID()).append(']');
}
return b.toString();
super(clientVersion, in);
}
@Override
public RequestType requestType() {
return RequestType.QUERY_INIT;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
QueryInitRequest other = (QueryInitRequest) obj;
return fetchSize == other.fetchSize
&& Objects.equals(query, other.query)
&& Objects.equals(timeout, other.timeout)
&& Objects.equals(timeZone.getID(), other.timeZone.getID());
}
@Override
public int hashCode() {
return Objects.hash(fetchSize, query, timeout, timeZone.getID().hashCode());
}
}

View File

@ -7,8 +7,8 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import java.io.DataInput;
import java.io.DataOutput;
@ -19,32 +19,25 @@ import java.util.Objects;
import static java.util.Collections.unmodifiableList;
public class QueryInitResponse extends Response {
public final long serverTimeQueryReceived, serverTimeResponseSent;
public final String requestId;
public class QueryInitResponse extends AbstractQueryResponse {
public final List<ColumnInfo> columns;
public final Payload data;
public QueryInitResponse(long serverTimeQueryReceived, long serverTimeResponseSent, String requestId, List<ColumnInfo> columns,
Payload data) {
this.serverTimeQueryReceived = serverTimeQueryReceived;
this.serverTimeResponseSent = serverTimeResponseSent;
this.requestId = requestId;
public QueryInitResponse(long tookNanos, byte[] cursor, List<ColumnInfo> columns, Payload data) {
super(tookNanos, cursor);
this.columns = columns;
this.data = data;
}
QueryInitResponse(Request request, DataInput in) throws IOException {
serverTimeQueryReceived = in.readLong();
serverTimeResponseSent = in.readLong();
requestId = in.readUTF();
super(request, in);
int size = in.readInt();
List<ColumnInfo> columns = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
columns.add(new ColumnInfo(in));
}
this.columns = unmodifiableList(columns);
//NOCOMMIT - Page is a client class, it shouldn't leak here
// NOCOMMIT - Page is a client class, it shouldn't leak here
Page data = new Page(columns);
data.read(in);
this.data = data;
@ -52,9 +45,7 @@ public class QueryInitResponse extends Response {
@Override
public void write(int clientVersion, DataOutput out) throws IOException {
out.writeLong(serverTimeQueryReceived);
out.writeLong(serverTimeResponseSent);
out.writeUTF(requestId);
super.write(clientVersion, out);
out.writeInt(columns.size());
for (ColumnInfo c : columns) {
c.write(out);
@ -64,10 +55,8 @@ public class QueryInitResponse extends Response {
@Override
protected String toStringBody() {
return "timeReceived=[" + serverTimeQueryReceived
+ "] timeSent=[" + serverTimeResponseSent
+ "] requestId=[" + requestId
+ "] columns=" + columns
return super.toStringBody()
+ " columns=" + columns
+ " data=[\n" + data + "]";
}
@ -83,19 +72,16 @@ public class QueryInitResponse extends Response {
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
if (false == super.equals(obj)) {
return false;
}
QueryInitResponse other = (QueryInitResponse) obj;
return serverTimeQueryReceived == other.serverTimeQueryReceived
&& serverTimeResponseSent == other.serverTimeResponseSent
&& requestId.equals(other.requestId)
&& columns.equals(other.columns);
return columns.equals(other.columns);
// NOCOMMIT data
}
@Override
public int hashCode() {
return Objects.hash(serverTimeQueryReceived, serverTimeResponseSent, requestId, columns); // NOCOMMIT data
return Objects.hash(super.hashCode(), columns); // NOCOMMIT data
}
}

View File

@ -6,71 +6,34 @@
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryPageRequest;
import org.elasticsearch.xpack.sql.protocol.shared.Nullable;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public class QueryPageRequest extends Request {
public final String requestId;
public final TimeoutInfo timeout;
public class QueryPageRequest extends AbstractQueryPageRequest {
private final transient Payload data;
public QueryPageRequest(String requestId, TimeoutInfo timeout, @Nullable Payload data) {
if (requestId == null) {
throw new IllegalArgumentException("[requestId] must not be null");
}
if (timeout == null) {
throw new IllegalArgumentException("[timeout] must not be null");
}
this.requestId = requestId;
this.timeout = timeout;
public QueryPageRequest(byte[] cursor, TimeoutInfo timeout, @Nullable Payload data) {
super(cursor, timeout);
this.data = data;
}
QueryPageRequest(int clientVersion, DataInput in) throws IOException {
this.requestId = in.readUTF();
this.timeout = new TimeoutInfo(in);
this.data = null; // Data isn't used on the server side
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(requestId);
timeout.write(out);
super(clientVersion, in);
this.data = null; // data isn't used on the server side
}
public Payload data() {
return data;
}
@Override
protected String toStringBody() {
return requestId;
}
@Override
public RequestType requestType() {
return RequestType.QUERY_PAGE;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
QueryPageRequest other = (QueryPageRequest) obj;
return requestId.equals(other.requestId)
&& timeout.equals(other.timeout);
// data is intentionally ignored
}
@Override
public int hashCode() {
return Objects.hash(requestId, timeout);
// data is intentionally ignored
}
// not overriding hashCode and equals because we're intentionally ignore the data field
}

View File

@ -7,31 +7,24 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public class QueryPageResponse extends Response {
public final String requestId;
public class QueryPageResponse extends AbstractQueryResponse {
private final Payload data;
public QueryPageResponse(String requestId, Page data) {
if (requestId == null) {
throw new IllegalArgumentException("[requestId] must not be null");
}
if (data == null) {
throw new IllegalArgumentException("[data] must not be null");
}
this.requestId = requestId;
public QueryPageResponse(long tookNanos, byte[] cursor, Payload data) {
super(tookNanos, cursor);
this.data = data;
}
QueryPageResponse(Request request, DataInput in) throws IOException {
this.requestId = in.readUTF();
super(request, in);
QueryPageRequest queryPageRequest = (QueryPageRequest) request;
data = queryPageRequest.data();
queryPageRequest.data().read(in);
@ -39,14 +32,13 @@ public class QueryPageResponse extends Response {
@Override
public void write(int clientVersion, DataOutput out) throws IOException {
out.writeUTF(requestId);
super.write(clientVersion, out);
data.write(out);
}
@Override
protected String toStringBody() {
return "requestId=[" + requestId
+ "] data=[\n" + data + "]";
return super.toStringBody() + " data=[\n" + data + "]";
}
@Override
@ -61,16 +53,15 @@ public class QueryPageResponse extends Response {
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
if (false == super.equals(obj)) {
return false;
}
QueryPageResponse other = (QueryPageResponse) obj;
return requestId.equals(other.requestId)
&& data.equals(other.data);
return data.equals(other.data);
}
@Override
public int hashCode() {
return Objects.hash(requestId, data);
return Objects.hash(data);
}
}

View File

@ -7,11 +7,14 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import org.elasticsearch.xpack.sql.test.RoundTripTestUtils;
import java.io.IOException;
import java.util.function.Supplier;
import static org.elasticsearch.test.ESTestCase.randomNonNegativeLong;
public final class JdbcRoundTripTestUtils {
private JdbcRoundTripTestUtils() {
// Just static utilities
@ -26,4 +29,8 @@ public final class JdbcRoundTripTestUtils {
(r, out) -> Proto.INSTANCE.writeResponse(r, Proto.CURRENT_VERSION, out),
in -> Proto.INSTANCE.readResponse(request.get(), in));
}
static TimeoutInfo randomTimeoutInfo() {
return new TimeoutInfo(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
}
}

View File

@ -6,16 +6,17 @@
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.IOException;
import java.util.TimeZone;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.assertRoundTripCurrentVersion;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.TimeoutInfoTests.randomTimeoutInfo;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.randomTimeoutInfo;
public class QueryInitRequestTests extends ESTestCase {
public static QueryInitRequest randomQueryInitRequest() {
static QueryInitRequest randomQueryInitRequest() {
return new QueryInitRequest(randomAlphaOfLength(5), between(0, Integer.MAX_VALUE), randomTimeZone(random()), randomTimeoutInfo());
}

View File

@ -16,8 +16,10 @@ import static org.elasticsearch.xpack.sql.jdbc.net.protocol.PageTests.randomPage
public class QueryInitResponseTests extends ESTestCase {
static QueryInitResponse randomQueryInitResponse() {
byte[] cursor = new byte[between(0, 5)];
random().nextBytes(cursor);
Page page = randomPage();
return new QueryInitResponse(randomNonNegativeLong(), randomNonNegativeLong(), randomAlphaOfLength(5), page.columnInfo(), page);
return new QueryInitResponse(randomNonNegativeLong(), cursor, page.columnInfo(), page);
}
public void testRoundTrip() throws IOException {
@ -29,8 +31,8 @@ public class QueryInitResponseTests extends ESTestCase {
new Object[] {"test"},
new Object[] {"string"},
});
assertEquals("QueryInitResponse<timeReceived=[123] timeSent=[456] requestId=[test_id] columns=[a<type=[VARCHAR]>] data=["
assertEquals("QueryInitResponse<tookNanos=[123] cursor=[0120] columns=[a<type=[VARCHAR]>] data=["
+ "\ntest\nstring\n]>",
new QueryInitResponse(123, 456, "test_id", page.columnInfo(), page).toString());
new QueryInitResponse(123, new byte[] {0x01, 0x20}, page.columnInfo(), page).toString());
}
}

View File

@ -6,15 +6,18 @@
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.assertRoundTripCurrentVersion;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.TimeoutInfoTests.randomTimeoutInfo;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.randomTimeoutInfo;
public class QueryPageRequestTests extends ESTestCase {
public static QueryPageRequest randomQueryPageRequest(Page page) {
return new QueryPageRequest(randomAlphaOfLength(5), randomTimeoutInfo(), page);
static QueryPageRequest randomQueryPageRequest(Page page) {
byte[] cursor = new byte[between(0, 5)];
random().nextBytes(cursor);
return new QueryPageRequest(cursor, randomTimeoutInfo(), page);
}
public void testRoundTrip() throws IOException {
@ -22,6 +25,6 @@ public class QueryPageRequestTests extends ESTestCase {
}
public void testToString() {
assertEquals("QueryPageRequest<test_id>", new QueryPageRequest("test_id", new TimeoutInfo(1, 1, 1), null).toString());
assertEquals("QueryPageRequest<0320>", new QueryPageRequest(new byte[] {0x03, 0x20}, new TimeoutInfo(1, 1, 1), null).toString());
}
}

View File

@ -17,7 +17,9 @@ import static org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequestTest
public class QueryPageResponseTests extends ESTestCase {
static QueryPageResponse randomQueryPageResponse(Page page) {
return new QueryPageResponse(randomAlphaOfLength(5), page);
byte[] cursor = new byte[between(0, 5)];
random().nextBytes(cursor);
return new QueryPageResponse(randomNonNegativeLong(), cursor, page);
}
public void testRoundTrip() throws IOException {
@ -29,6 +31,7 @@ public class QueryPageResponseTests extends ESTestCase {
Page results = new Page(singletonList(varcharInfo("a")), new Object[][] {
new Object[] {"test"}
});
assertEquals("QueryPageResponse<requestId=[test_id] data=[\ntest\n]>", new QueryPageResponse("test_id", results).toString());
assertEquals("QueryPageResponse<tookNanos=[123] cursor=[0810] data=[\ntest\n]>",
new QueryPageResponse(123, new byte[] {0x08, 0x10}, results).toString());
}
}

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.sql.jdbc.net.client;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ColumnInfo;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Page;
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
import java.sql.SQLException;
import java.util.List;
@ -19,19 +18,15 @@ class DefaultCursor implements Cursor {
private final Page page;
private int row = -1;
private String requestId;
private byte[] cursor;
DefaultCursor(JdbcHttpClient client, String scrollId, Page page, RequestMeta meta) {
DefaultCursor(JdbcHttpClient client, byte[] cursor, Page page, RequestMeta meta) {
this.client = client;
this.meta = meta;
this.requestId = simplifyScrollId(scrollId);
this.cursor = cursor;
this.page = page;
}
private static String simplifyScrollId(String scrollId) {
return StringUtils.hasText(scrollId) ? scrollId : null;
}
@Override
public List<ColumnInfo> columns() {
return page.columnInfo();
@ -44,8 +39,8 @@ class DefaultCursor implements Cursor {
return true;
}
else {
if (requestId != null) {
requestId = simplifyScrollId(client.nextPage(requestId, page, meta));
if (cursor.length != 0) {
cursor = client.nextPage(cursor, page, meta);
row = -1;
return next();
}

View File

@ -23,12 +23,12 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.TimeoutInfo;
import org.elasticsearch.xpack.sql.jdbc.util.BytesArray;
import org.elasticsearch.xpack.sql.jdbc.util.FastByteArrayInputStream;
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.Closeable;
import java.io.DataInput;
@ -69,17 +69,17 @@ public class JdbcHttpClient implements Closeable {
QueryInitRequest request = new QueryInitRequest(sql, fetch, conCfg.timeZone(), timeout(meta));
BytesArray ba = http.put(out -> Proto.INSTANCE.writeRequest(request, out));
QueryInitResponse response = doIO(ba, in -> (QueryInitResponse) readResponse(request, in));
return new DefaultCursor(this, response.requestId, (Page) response.data, meta);
return new DefaultCursor(this, response.cursor(), (Page) response.data, meta);
}
/**
* Read the next page of results, updating the {@link Page} and returning
* the scroll id to use to fetch the next page.
*/
public String nextPage(String requestId, Page page, RequestMeta meta) throws SQLException {
QueryPageRequest request = new QueryPageRequest(requestId, timeout(meta), page);
public byte[] nextPage(byte[] cursor, Page page, RequestMeta meta) throws SQLException {
QueryPageRequest request = new QueryPageRequest(cursor, timeout(meta), page);
BytesArray ba = http.put(out -> Proto.INSTANCE.writeRequest(request, out));
return doIO(ba, in -> ((QueryPageResponse) readResponse(request, in)).requestId);
return doIO(ba, in -> ((QueryPageResponse) readResponse(request, in)).cursor());
}
public InfoResponse serverInfo() throws SQLException {

View File

@ -0,0 +1,151 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
/**
* Formats {@link SqlResponse} for the CLI. {@linkplain Writeable} so
* that its state can be saved between pages of results.
*/
public class CliFormatter implements Writeable {
/**
* The minimum width for any column in the formatted results.
*/
private static final int MIN_COLUMN_WIDTH = 15;
private int[] width;
/**
* Create a new {@linkplain CliFormatter} for formatting responses similar
* to the provided {@link SqlResponse}.
*/
public CliFormatter(SqlResponse response) {
// Figure out the column widths:
// 1. Start with the widths of the column names
width = new int[response.columns().size()];
for (int i = 0; i < width.length; i++) {
// TODO read the width from the data type?
width[i] = Math.max(MIN_COLUMN_WIDTH, response.columns().get(i).name().length());
}
// 2. Expand columns to fit the largest value
for (List<Object> row : response.rows()) {
for (int i = 0; i < width.length; i++) {
// NOCOMMIT are we sure toString is correct here? What about dates that come back as longs.
width[i] = Math.max(width[i], Objects.toString(row.get(i)).length());
}
}
}
public CliFormatter(StreamInput in) throws IOException {
width = in.readIntArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeIntArray(width);
}
/**
* Format the provided {@linkplain SqlResponse} for the CLI
* including the header lines.
*/
public String formatWithHeader(SqlResponse response) {
// The header lines
StringBuilder sb = new StringBuilder(estimateSize(response.rows().size() + 2));
for (int i = 0; i < width.length; i++) {
if (i > 0) {
sb.append('|');
}
String name = response.columns().get(i).name();
// left padding
int leftPadding = (width[i] - name.length()) / 2;
for (int j = 0; j < leftPadding; j++) {
sb.append(' ');
}
sb.append(name);
// right padding
for (int j = 0; j < width[i] - name.length() - leftPadding; j++) {
sb.append(' ');
}
}
sb.append('\n');
for (int i = 0; i < width.length; i++) {
if (i > 0) {
sb.append('+');
}
for (int j = 0; j < width[i]; j++) {
sb.append('-'); // emdash creates issues
}
}
sb.append('\n');
/* Now format the results. Sadly, this means that column
* widths are entirely determined by the first batch of
* results. */
return formatWithoutHeader(sb, response);
}
/**
* Format the provided {@linkplain SqlResponse} for the CLI
* without the header lines.
*/
public String formatWithoutHeader(SqlResponse response) {
return formatWithoutHeader(new StringBuilder(estimateSize(response.rows().size())), response).toString();
}
private String formatWithoutHeader(StringBuilder sb, SqlResponse response) {
for (List<Object> row : response.rows()) {
for (int i = 0; i < width.length; i++) {
if (i > 0) {
sb.append('|');
}
// NOCOMMIT are we sure toString is correct here? What about dates that come back as longs.
String string = Objects.toString(row.get(i));
if (string.length() <= width[i]) {
// Pad
sb.append(string);
int padding = width[i] - string.length();
for (int p = 0; p < padding; p++) {
sb.append(' ');
}
} else {
// Trim
sb.append(string.substring(0, width[i] - 1));
sb.append('~');
}
}
sb.append('\n');
}
return sb.toString();
}
/**
* Pick a good estimate of the buffer size needed to contain the rows.
*/
int estimateSize(int rows) {
/* Each column has either a '|' or a '\n' after it
* so initialize size to number of columns then add
* up the actual widths of each column. */
int rowWidthEstimate = width.length;
for (int w : width) {
rowWidthEstimate += w;
}
return rowWidthEstimate * rows;
}
}

View File

@ -0,0 +1,153 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.main.MainAction;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestActionListener;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageResponse;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.joda.time.DateTimeZone;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Function;
import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.OK;
public class RestSqlCliAction extends BaseRestHandler {
private final NamedWriteableRegistry cursorRegistry = new NamedWriteableRegistry(Cursor.getNamedWriteables());
public RestSqlCliAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(POST, "/_sql/cli", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Request request;
try (DataInputStream in = new DataInputStream(restRequest.content().streamInput())) {
request = Proto.INSTANCE.readRequest(in);
}
Consumer<RestChannel> consumer = operation(cursorRegistry, request, client);
return consumer::accept;
}
/**
* Actual implementation of the operation
*/
public static Consumer<RestChannel> operation(NamedWriteableRegistry cursorRegistry, Request request, Client client)
throws IOException {
RequestType requestType = (RequestType) request.requestType();
switch (requestType) {
case INFO:
return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), toActionListener(channel, response ->
new InfoResponse(response.getNodeName(), response.getClusterName().value(),
response.getVersion().major, response.getVersion().minor, response.getVersion().toString(),
response.getBuild().shortHash(), response.getBuild().date())));
case QUERY_INIT:
return queryInit(client, (QueryInitRequest) request);
case QUERY_PAGE:
return queryPage(cursorRegistry, client, (QueryPageRequest) request);
default:
throw new IllegalArgumentException("Unsupported action [" + requestType + "]");
}
}
private static Consumer<RestChannel> queryInit(Client client, QueryInitRequest request) {
// TODO time zone support for CLI
SqlRequest sqlRequest = new SqlRequest(request.query, SqlRequest.DEFAULT_TIME_ZONE, Cursor.EMPTY)
.timeZone(DateTimeZone.forTimeZone(request.timeZone))
.fetchSize(request.fetchSize);
long start = System.nanoTime();
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> {
CliFormatter formatter = new CliFormatter(response);
String data = formatter.formatWithHeader(response);
return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data);
}));
}
private static Consumer<RestChannel> queryPage(NamedWriteableRegistry cursorRegistry, Client client, QueryPageRequest request) {
Cursor cursor;
CliFormatter formatter;
try (StreamInput in = new NamedWriteableAwareStreamInput(new BytesArray(request.cursor).streamInput(), cursorRegistry)) {
cursor = in.readNamedWriteable(Cursor.class);
formatter = new CliFormatter(in);
} catch (IOException e) {
throw new IllegalArgumentException("error reading the cursor");
}
SqlRequest sqlRequest = new SqlRequest("", SqlRequest.DEFAULT_TIME_ZONE, cursor);
long start = System.nanoTime();
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> {
String data = formatter.formatWithoutHeader(response);
return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data);
}));
}
private static byte[] serializeCursor(Cursor cursor, CliFormatter formatter) {
if (cursor == Cursor.EMPTY) {
return new byte[0];
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeNamedWriteable(cursor);
formatter.writeTo(out);
return BytesRef.deepCopyOf(out.bytes().toBytesRef()).bytes;
} catch (IOException e) {
throw new RuntimeException("unexpected trouble building the cursor", e);
}
}
private static <T> ActionListener<T> toActionListener(RestChannel channel, Function<T, Response> responseBuilder) {
// NOCOMMIT error response
return new RestActionListener<T>(channel) {
@Override
protected void processResponse(T actionResponse) throws Exception {
Response response = responseBuilder.apply(actionResponse);
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) {
// NOCOMMIT use the version from the client
Proto.INSTANCE.writeResponse(response, Proto.CURRENT_VERSION, dataOutputStream);
}
channel.sendResponse(new BytesRestResponse(OK, TEXT_CONTENT_TYPE, bytesStreamOutput.bytes()));
}
}
};
}
@Override
public String getName() {
return "xpack_sql_cli_action";
}
}

View File

@ -27,9 +27,6 @@ import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
import org.elasticsearch.xpack.sql.analysis.catalog.FilteredCatalog;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction.TransportAction.CatalogHolder;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliHttpHandler;
import org.elasticsearch.xpack.sql.plugin.cli.action.TransportCliAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcHttpHandler;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.TransportJdbcAction;
@ -81,14 +78,13 @@ public class SqlPlugin implements ActionPlugin {
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<DiscoveryNodes> nodesInCluster) {
return Arrays.asList(new RestSqlAction(settings, restController),
new CliHttpHandler(settings, restController),
new RestSqlCliAction(settings, restController),
new JdbcHttpHandler(settings, restController));
}
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(new ActionHandler<>(SqlAction.INSTANCE, TransportSqlAction.class),
new ActionHandler<>(CliAction.INSTANCE, TransportCliAction.class),
new ActionHandler<>(JdbcAction.INSTANCE, TransportJdbcAction.class),
new ActionHandler<>(SqlGetIndicesAction.INSTANCE, SqlGetIndicesAction.TransportAction.class));
}

View File

@ -1,96 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.cli;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.execution.search.SearchHitRowSetCursor;
import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.util.function.Supplier;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.xpack.sql.util.StringUtils.EMPTY;
public class CliServer extends AbstractSqlServer {
private final PlanExecutor executor;
private final Supplier<InfoResponse> infoResponse;
public CliServer(PlanExecutor executor, String clusterName, Supplier<String> nodeName, Version version, Build build) {
this.executor = executor;
// Delay building the response until runtime because the node name is not available at startup
this.infoResponse = () -> new InfoResponse(nodeName.get(), clusterName, version.major, version.minor, version.toString(),
build.shortHash(), build.date());
}
@Override
protected void innerHandle(Request req, ActionListener<Response> listener) {
RequestType requestType = (RequestType) req.requestType();
try {
switch (requestType) {
case INFO:
listener.onResponse(info((InfoRequest) req));
break;
case COMMAND:
command((CommandRequest) req, listener);
break;
default:
throw new IllegalArgumentException("Unsupported action [" + requestType + "]");
}
} catch (Exception ex) {
listener.onResponse(exceptionResponse(req, ex));
}
}
@Override
protected ErrorResponse buildErrorResponse(Request request, String message, String cause, String stack) {
return new ErrorResponse((RequestType) request.requestType(), message, cause, stack);
}
@Override
protected ExceptionResponse buildExceptionResponse(Request request, String message, String cause,
SqlExceptionType exceptionType) {
return new ExceptionResponse((RequestType) request.requestType(), message, cause, exceptionType);
}
public InfoResponse info(InfoRequest req) {
return infoResponse.get();
}
public void command(CommandRequest req, ActionListener<Response> listener) {
final long start = System.currentTimeMillis(); // NOCOMMIT should be nanoTime or else clock skew will skew us
// NOCOMMIT: need to add settings for CLI
// TODO support non-utc for cli server
executor.sql(req.command, wrap(
c -> {
long stop = System.currentTimeMillis();
String requestId = EMPTY;
if (c.hasNextSet() && c instanceof SearchHitRowSetCursor) {
requestId = StringUtils.nullAsEmpty(((SearchHitRowSetCursor) c).scrollId());
}
// NOCOMMIT it looks like this tries to buffer the entire response in memory before returning it which is going to OOM some po
// NOCOMMIT also this blocks the current thread while it iterates the cursor
listener.onResponse(new CommandResponse(start, stop, requestId, CliUtils.toString(c)));
},
ex -> listener.onResponse(exceptionResponse(req, ex))));
}
}

View File

@ -1,122 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.cli;
import java.util.concurrent.atomic.AtomicBoolean;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
abstract class CliUtils {
// this toString is a bit convoluted since it tries to be smart and pad the columns according to their values
// as such it will look inside the row, find the max for each column and pad all the values accordingly
// things are more complicated when the value is split across multiple lines (like a plan) in which case
// a row needs to be iterated upon to fill up the values that don't take extra lines
// Warning: this method _consumes_ a rowset
static String toString(RowSetCursor cursor) {
if (cursor.rowSize() == 1 && cursor.size() == 1 && cursor.column(0).toString().startsWith("digraph ")) {
return cursor.column(0).toString();
}
StringBuilder sb = new StringBuilder();
// use the schema as a header followed by
// do a best effort to compute the width of the column
int[] width = new int[cursor.rowSize()];
for (int i = 0; i < cursor.rowSize(); i++) {
// TODO: default schema width
width[i] = Math.max(15, cursor.schema().names().get(i).length());
}
AtomicBoolean firstRun = new AtomicBoolean(true);
// take a look at the first values
cursor.forEachSet(rowSet -> {
for (boolean shouldRun = rowSet.hasCurrentRow(); shouldRun; shouldRun = rowSet.advanceRow()) {
for (int column = 0; column < rowSet.rowSize(); column++) {
if (column > 0) {
sb.append("|");
}
String val = String.valueOf(rowSet.column(column));
// the value might contain multiple lines (plan execution for example)
// TODO: this needs to be improved to properly scale each row across multiple lines
String[] split = val.split("\\n");
if (firstRun.get()) {
// find max across splits
for (int splitIndex = 0; splitIndex < split.length; splitIndex++) {
width[column] = Math.max(width[column], split[splitIndex].length());
}
}
for (int splitIndex = 0; splitIndex < split.length; splitIndex++) {
if (splitIndex > 0) {
sb.append("\n");
}
String string = split[splitIndex];
// does the value fit the column ?
if (string.length() <= width[column]) {
sb.append(string);
// pad value
for (int k = 0; k < width[column] - string.length(); k++) {
sb.append(" ");
}
}
// no, then trim it
else {
sb.append(string.substring(0, width[column] - 1));
sb.append("~");
}
}
}
sb.append("\n");
firstRun.set(false);
}
});
// compute the header
StringBuilder header = new StringBuilder();
for (int i = 0; i < cursor.rowSize(); i++) {
if (i > 0) {
header.append("|");
}
String name = cursor.schema().names().get(i);
// left padding
int leftPadding = (width[i] - name.length()) / 2;
for (int j = 0; j < leftPadding; j++) {
header.append(" ");
}
header.append(name);
// right padding
for (int j = 0; j < width[i] - name.length() - leftPadding; j++) {
header.append(" ");
}
}
header.append("\n");
for (int i = 0; i < cursor.rowSize(); i++) {
if (i > 0) {
header.append("+");
}
for (int j = 0; j < width[i]; j++) {
header.append("-"); // emdash creates issues
}
}
header.append("\n");
// append the header
sb.insert(0, header.toString());
return sb.toString();
}
}

View File

@ -1,29 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
public class CliAction extends Action<CliRequest, CliResponse, CliRequestBuilder> {
public static final CliAction INSTANCE = new CliAction();
public static final String NAME = "indices:data/read/sql/cli";
private CliAction() {
super(NAME);
}
@Override
public CliRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new CliRequestBuilder(client, this);
}
@Override
public CliResponse newResponse() {
return new CliResponse();
}
}

View File

@ -1,57 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import java.io.IOException;
import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.OK;
public class CliHttpHandler extends BaseRestHandler {
public CliHttpHandler(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(POST, "/_cli", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!request.hasContent()) {
throw new IllegalArgumentException("expected a request body");
}
CliRequest cliRequest = new CliRequest(request.content());
return c -> client.executeLocally(CliAction.INSTANCE, cliRequest,
ActionListener.wrap(response -> c.sendResponse(new BytesRestResponse(OK, TEXT_CONTENT_TYPE, response.bytesReference())),
ex -> error(c, ex)));
}
private static void error(RestChannel channel, Exception ex) {
BytesRestResponse response;
try {
response = new BytesRestResponse(channel, ex);
} catch (IOException e) {
response = new BytesRestResponse(OK, TEXT_CONTENT_TYPE, ExceptionsHelper.stackTrace(e));
}
channel.sendResponse(response);
}
@Override
public String getName() {
return "sql_cli_action";
}
}

View File

@ -1,118 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class CliRequest extends ActionRequest implements CompositeIndicesRequest {
private BytesReference bytesReference;
public CliRequest() {
}
public CliRequest(Request request) {
try {
request(request);
} catch (IOException ex) {
throw new IllegalArgumentException("cannot serialize the request", ex);
}
}
public CliRequest(BytesReference bytesReference) {
this.bytesReference = bytesReference;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (bytesReference == null) {
validationException = addValidationError("no request has been specified", validationException);
}
return validationException;
}
/**
* Gets the response object from internally stored serialized version
*/
public Request request() throws IOException {
try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) {
return Proto.INSTANCE.readRequest(in);
}
}
/**
* Converts the response object into internally stored serialized version
*/
public CliRequest request(Request request) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) {
Proto.INSTANCE.writeRequest(request, dataOutputStream);
}
bytesReference = bytesStreamOutput.bytes();
}
return this;
}
public BytesReference bytesReference() {
return bytesReference;
}
@Override
public int hashCode() {
return Objects.hash(bytesReference);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
CliRequest other = (CliRequest) obj;
return Objects.equals(bytesReference, other.bytesReference);
}
@Override
public String getDescription() {
try {
return "SQL CLI [" + request() + "]";
} catch (IOException ex) {
return "SQL CLI [" + ex.getMessage() + "]";
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
bytesReference = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(bytesReference);
}
}

View File

@ -1,28 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.IOException;
public class CliRequestBuilder extends ActionRequestBuilder<CliRequest, CliResponse, CliRequestBuilder> {
public CliRequestBuilder(ElasticsearchClient client, CliAction action) {
super(client, action, new CliRequest());
}
public CliRequestBuilder(ElasticsearchClient client, CliAction action, Request req) {
super(client, action, new CliRequest(req));
}
public CliRequestBuilder request(Request req)throws IOException {
request.request(req);
return this;
}
}

View File

@ -1,92 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Objects;
public class CliResponse extends ActionResponse {
private BytesReference bytesReference;
public CliResponse() {
}
public CliResponse(BytesReference bytesReference) {
this.bytesReference = bytesReference;
}
public CliResponse(Response response) {
try {
response(response);
} catch (IOException ex) {
throw new IllegalArgumentException("cannot serialize the request", ex);
}
}
/**
* Gets the response object from internally stored serialized version
*
* @param request the request that was used to generate this response
*/
public Response response(Request request) throws IOException {
try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) {
return Proto.INSTANCE.readResponse(request, in);
}
}
/**
* Serialized the response object into internally stored serialized version
*/
public CliResponse response(Response response) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) {
Proto.INSTANCE.writeResponse(response, Proto.CURRENT_VERSION, dataOutputStream);
}
bytesReference = bytesStreamOutput.bytes();
}
return this;
}
public BytesReference bytesReference() {
return bytesReference;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
bytesReference = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(bytesReference);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CliResponse that = (CliResponse) o;
return Objects.equals(bytesReference, that.bytesReference);
}
@Override
public int hashCode() {
return Objects.hash(bytesReference);
}
}

View File

@ -1,58 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.SqlLicenseChecker;
import org.elasticsearch.xpack.sql.plugin.cli.CliServer;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.util.ActionUtils.chain;
public class TransportCliAction extends HandledTransportAction<CliRequest, CliResponse> {
private final CliServer cliServer;
private final SqlLicenseChecker sqlLicenseChecker;
@Inject
public TransportCliAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
PlanExecutor planExecutor,
SqlLicenseChecker sqlLicenseChecker) {
super(settings, CliAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, CliRequest::new);
this.sqlLicenseChecker = sqlLicenseChecker;
this.cliServer = new CliServer(planExecutor, clusterService.getClusterName().value(),
() -> clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT);
}
@Override
protected void doExecute(CliRequest cliRequest, ActionListener<CliResponse> listener) {
sqlLicenseChecker.checkIfSqlAllowed();
final Request request;
try {
request = cliRequest.request();
} catch (IOException ex) {
listener.onFailure(ex);
return;
}
// NOCOMMIT we need to pass the protocol version of the client to the response here
cliServer.handle(request, chain(listener, CliResponse::new));
}
}

View File

@ -13,7 +13,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.execution.search.SearchHitRowSetCursor;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ColumnInfo;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ExceptionResponse;
@ -134,7 +133,7 @@ public class JdbcServer extends AbstractSqlServer {
public void queryInit(QueryInitRequest req, ActionListener<Response> listener) {
final long start = System.currentTimeMillis();
final long start = System.nanoTime();
SqlSettings sqlCfg = new SqlSettings(Settings.builder()
.put(SqlSettings.PAGE_SIZE, req.fetchSize)
@ -144,17 +143,14 @@ public class JdbcServer extends AbstractSqlServer {
//NOCOMMIT: this should be pushed down to the TransportSqlAction to hook up pagination
executor.sql(sqlCfg, req.query, wrap(c -> {
long stop = System.currentTimeMillis();
String requestId = EMPTY;
if (c.hasNextSet() && c instanceof SearchHitRowSetCursor) {
requestId = StringUtils.nullAsEmpty(((SearchHitRowSetCursor) c).scrollId());
}
long stop = System.nanoTime();
List<ColumnInfo> columnInfo = c.schema().stream()
.map(e -> new ColumnInfo(e.name(), e.type().sqlType(), EMPTY, EMPTY, EMPTY, EMPTY))
.collect(toList());
listener.onResponse(new QueryInitResponse(start, stop, requestId, columnInfo, new RowSetPayload(c)));
// NOCOMMIT paging for jdbc
listener.onResponse(new QueryInitResponse(stop - start, new byte[0], columnInfo, new RowSetPayload(c)));
}, ex -> listener.onResponse(exceptionResponse(req, ex))));
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryInitRequest;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.joda.time.DateTimeZone;
@ -32,7 +33,7 @@ public class SqlRequest extends ActionRequest implements CompositeIndicesRequest
}
public static final DateTimeZone DEFAULT_TIME_ZONE = DateTimeZone.UTC;
public static final int DEFAULT_FETCH_SIZE = 1000;
public static final int DEFAULT_FETCH_SIZE = AbstractQueryInitRequest.DEFAULT_FETCH_SIZE;
private String query = "";
private DateTimeZone timeZone = DEFAULT_TIME_ZONE;
@ -41,10 +42,19 @@ public class SqlRequest extends ActionRequest implements CompositeIndicesRequest
public SqlRequest() {}
public SqlRequest(String query, DateTimeZone timeZone, Cursor nextPageInfo) {
public SqlRequest(String query, DateTimeZone timeZone, Cursor cursor) {
if (query == null) {
throw new IllegalArgumentException("query must not be null");
}
if (timeZone == null) {
throw new IllegalArgumentException("timeZone must not be null");
}
if (cursor == null) {
throw new IllegalArgumentException("cursor must not be null");
}
this.query = query;
this.timeZone = timeZone;
this.cursor = nextPageInfo;
this.cursor = cursor;
}
@Override

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.sql.plugin.sql.action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -24,7 +23,6 @@ import java.util.Objects;
import static java.util.Collections.unmodifiableList;
public class SqlResponse extends ActionResponse implements ToXContentObject {
private Cursor cursor;
private long size;
private int columnCount;
@ -44,10 +42,9 @@ public class SqlResponse extends ActionResponse implements ToXContentObject {
/**
* The key that must be sent back to SQL to access the next page of
* results. If {@link BytesReference#length()} is {@code 0} then
* there is no next page.
* results. If equal to {@link Cursor#EMPTY} then there is no next page.
*/
public Cursor nextPageInfo() {
public Cursor cursor() {
return cursor;
}
@ -164,6 +161,9 @@ public class SqlResponse extends ActionResponse implements ToXContentObject {
return Strings.toString(this);
}
/**
* Information about a column.
*/
public static final class ColumnInfo implements Writeable, ToXContentObject {
// NOCOMMIT: we probably need to add more info about columns, but that's all we use for now
private final String name;

View File

@ -45,6 +45,13 @@ public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlRe
@Override
protected void doExecute(SqlRequest request, ActionListener<SqlResponse> listener) {
sqlLicenseChecker.checkIfSqlAllowed();
operation(planExecutor, request, listener);
}
/**
* Actual implementation of the action. Statically available to support embedded mode.
*/
public static void operation(PlanExecutor planExecutor, SqlRequest request, ActionListener<SqlResponse> listener) {
if (request.cursor() == Cursor.EMPTY) {
SqlSettings sqlSettings = new SqlSettings(Settings.builder()
.put(SqlSettings.PAGE_SIZE, request.fetchSize())

View File

@ -41,7 +41,7 @@ public class RestSqlAction extends BaseRestHandler {
@Override
public String getName() {
return "sql_action";
return "xpack_sql_action";
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse.ColumnInfo;
import org.elasticsearch.xpack.sql.session.Cursor;
import java.util.Arrays;
import static org.hamcrest.Matchers.arrayWithSize;
public class CliFormatterTests extends ESTestCase {
private final SqlResponse firstResponse = new SqlResponse(Cursor.EMPTY, 10, 5,
Arrays.asList(
new ColumnInfo("foo", "string"),
new ColumnInfo("bar", "long"),
new ColumnInfo("15charwidename!", "double"),
new ColumnInfo("superduperwidename!!!", "double"),
new ColumnInfo("baz", "keyword")),
Arrays.asList(
Arrays.asList("15charwidedata!", 1, 6.888, 12, "rabbit"),
Arrays.asList("dog", 1.7976931348623157E308, 123124.888, 9912, "goat")));
private final CliFormatter formatter = new CliFormatter(firstResponse);
/**
* Tests for {@link CliFormatter#formatWithHeader(SqlResponse)}, values
* of exactly the minimum column size, column names of exactly
* the minimum column size, column headers longer than the
* minimum column size, and values longer than the minimum
* column size.
*/
public void testFormatWithHeader() {
String[] result = formatter.formatWithHeader(firstResponse).split("\n");
assertThat(result, arrayWithSize(4));
assertEquals(" foo | bar |15charwidename!|superduperwidename!!!| baz ", result[0]);
assertEquals("---------------+----------------------+---------------+---------------------+---------------", result[1]);
assertEquals("15charwidedata!|1 |6.888 |12 |rabbit ", result[2]);
assertEquals("dog |1.7976931348623157E308|123124.888 |9912 |goat ", result[3]);
}
/**
* Tests for {@link CliFormatter#formatWithoutHeader(SqlResponse)} and
* truncation of long columns.
*/
public void testFormatWithoutHeader() {
String[] result = formatter.formatWithoutHeader(new SqlResponse(Cursor.EMPTY, 10, 5, null,
Arrays.asList(
Arrays.asList("ohnotruncateddata", 4, 1, 77, "wombat"),
Arrays.asList("dog", 2, 123124.888, 9912, "goat")))).split("\n");
assertThat(result, arrayWithSize(2));
assertEquals("ohnotruncatedd~|4 |1 |77 |wombat ", result[0]);
assertEquals("dog |2 |123124.888 |9912 |goat ", result[1]);
}
/**
* Ensure that our estimates are perfect in at least some cases.
*/
public void testEstimateSize() {
assertEquals(formatter.formatWithHeader(firstResponse).length(),
formatter.estimateSize(firstResponse.rows().size() + 2));
assertEquals(formatter.formatWithoutHeader(firstResponse).length(),
formatter.estimateSize(firstResponse.rows().size()));
}
}

View File

@ -1,29 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliRequest;
public class CliRequestTests extends AbstractStreamableTestCase<CliRequest> {
@Override
protected CliRequest createTestInstance() {
if (randomBoolean()) {
return new CliRequest(new InfoRequest(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10),
randomAlphaOfLength(10), randomAlphaOfLength(10)));
} else {
return new CliRequest(new CommandRequest(randomAlphaOfLength(10)));
}
}
@Override
protected CliRequest createBlankInstance() {
return new CliRequest();
}
}

View File

@ -1,31 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliResponse;
public class CliResponseTests extends AbstractStreamableTestCase<CliResponse> {
@Override
protected CliResponse createTestInstance() {
if (randomBoolean()) {
return new CliResponse(new InfoResponse(randomAlphaOfLength(10), randomAlphaOfLength(10),
randomByte(), randomByte(),
randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10)));
} else {
return new CliResponse(new CommandResponse(randomNonNegativeLong(), randomNonNegativeLong(),
randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
}
@Override
protected CliResponse createBlankInstance() {
return new CliResponse();
}
}

View File

@ -38,8 +38,7 @@ public abstract class AbstractProto {
public Request readRequest(DataInput in) throws IOException {
int clientVersion = readHeader(in);
if (clientVersion > CURRENT_VERSION) {
throw new IOException("Unknown client version [" + clientVersion + "]. Always upgrade sql last.");
// NOCOMMIT I believe we usually advise upgrading the clients *first* so this might be backwards.....
throw new IOException("Unknown client version [" + clientVersion + "]. Always upgrade client last.");
}
return readRequestType(in).reader().read(clientVersion, in);
}

View File

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
import java.util.TimeZone;
public abstract class AbstractQueryInitRequest extends Request {
/**
* Global choice for the default fetch size.
*/
public static final int DEFAULT_FETCH_SIZE = 1000;
public final String query;
public final int fetchSize;
public final TimeZone timeZone;
public final TimeoutInfo timeout;
protected AbstractQueryInitRequest(String query, int fetchSize, TimeZone timeZone, TimeoutInfo timeout) {
this.query = query;
this.fetchSize = fetchSize;
this.timeZone = timeZone;
this.timeout = timeout;
}
protected AbstractQueryInitRequest(int clientVersion, DataInput in) throws IOException {
query = in.readUTF();
fetchSize = in.readInt();
timeZone = TimeZone.getTimeZone(in.readUTF());
timeout = new TimeoutInfo(in);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(query);
out.writeInt(fetchSize);
out.writeUTF(timeZone.getID());
timeout.write(out);
}
@Override
protected String toStringBody() {
StringBuilder b = new StringBuilder();
b.append("query=[").append(query).append(']');
if (false == timeZone.getID().equals("UTC")) {
b.append(" timeZone=[").append(timeZone.getID()).append(']');
}
return b.toString();
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
AbstractQueryInitRequest other = (AbstractQueryInitRequest) obj;
return fetchSize == other.fetchSize
&& Objects.equals(query, other.query)
&& Objects.equals(timeout, other.timeout)
&& Objects.equals(timeZone.getID(), other.timeZone.getID());
}
@Override
public int hashCode() {
return Objects.hash(fetchSize, query, timeout, timeZone.getID().hashCode());
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
public abstract class AbstractQueryPageRequest extends Request {
public final byte[] cursor;
public final TimeoutInfo timeout;
protected AbstractQueryPageRequest(byte[] cursor, TimeoutInfo timeout) {
if (cursor == null) {
throw new IllegalArgumentException("[cursor] must not be null");
}
if (timeout == null) {
throw new IllegalArgumentException("[timeout] must not be null");
}
this.cursor = cursor;
this.timeout = timeout;
}
protected AbstractQueryPageRequest(int clientVersion, DataInput in) throws IOException {
this.cursor = new byte[ProtoUtil.readArraySize(in)];
in.readFully(cursor);
this.timeout = new TimeoutInfo(in);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(cursor.length);
out.write(cursor);
timeout.write(out);
}
@Override
protected String toStringBody() {
StringBuilder b = new StringBuilder();
for (int i = 0; i < cursor.length; i++) {
b.append(String.format(Locale.ROOT, "%02x", cursor[i]));
}
return b.toString();
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
AbstractQueryPageRequest other = (AbstractQueryPageRequest) obj;
return Arrays.equals(cursor, other.cursor)
&& timeout.equals(other.timeout);
}
@Override
public int hashCode() {
return Objects.hash(cursor, timeout);
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
/**
* Superclass for responses both for {@link AbstractQueryInitRequest}
* and {@link AbstractQueryPageRequest}.
*/
public abstract class AbstractQueryResponse extends Response {
private final long tookNanos;
private final byte[] cursor;
protected AbstractQueryResponse(long tookNanos, byte[] cursor) {
if (cursor == null) {
throw new IllegalArgumentException("cursor must not be null");
}
this.tookNanos = tookNanos;
this.cursor = cursor;
}
protected AbstractQueryResponse(Request request, DataInput in) throws IOException {
tookNanos = in.readLong();
cursor = new byte[ProtoUtil.readArraySize(in)];
in.readFully(cursor);
}
@Override
protected void write(int clientVersion, DataOutput out) throws IOException {
out.writeLong(tookNanos);
out.writeInt(cursor.length);
out.write(cursor);
}
/**
* How long the request took on the server as measured by
* {@link System#nanoTime()}.
*/
public long tookNanos() {
return tookNanos;
}
/**
* Cursor for fetching the next page. If it has {@code length = 0}
* then there is no next page.
*/
public byte[] cursor() {
return cursor;
}
@Override
protected String toStringBody() {
StringBuilder b = new StringBuilder();
b.append("tookNanos=[").append(tookNanos);
b.append("] cursor=[");
for (int i = 0; i < cursor.length; i++) {
b.append(String.format(Locale.ROOT, "%02x", cursor[i]));
}
b.append("]");
return b.toString();
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
AbstractQueryResponse other = (AbstractQueryResponse) obj;
return tookNanos == other.tookNanos
&& Arrays.equals(cursor, other.cursor);
}
@Override
public int hashCode() {
return Objects.hash(tookNanos, Arrays.hashCode(cursor));
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataInput;
import java.io.IOException;
public class ProtoUtil {
private static final int MAX_ARRAY_SIZE = 5 * 1024 * 1024 * 1024;
public static int readArraySize(DataInput in) throws IOException {
int length = in.readInt();
if (length > MAX_ARRAY_SIZE) {
throw new IOException("array size unbelievably long [" + length + "]");
}
return length;
}
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataInput;
import java.io.DataOutput;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
package org.elasticsearch.xpack.sql.protocol.shared;
import org.elasticsearch.test.ESTestCase;

View File

@ -26,12 +26,14 @@ public abstract class ProtoHandler<R> implements HttpHandler, AutoCloseable {
protected static final Logger log = ESLoggerFactory.getLogger(ProtoHandler.class.getName());
private final TimeValue TV = TimeValue.timeValueSeconds(5);
protected final Client client;
protected final NodeInfo info;
protected final String clusterName;
private final CheckedFunction<R, BytesReference, IOException> toProto;
protected ProtoHandler(Client client, CheckedFunction<R, BytesReference, IOException> toProto) {
NodesInfoResponse niResponse = client.admin().cluster().prepareNodesInfo("_local").clear().get(TV);
this.client = client;
info = niResponse.getNodes().get(0);
clusterName = niResponse.getClusterName().value();