diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponse.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponse.java index 2559365e8a7..18cde69a15a 100644 --- a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponse.java +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponse.java @@ -13,7 +13,7 @@ import java.io.DataInput; import java.io.IOException; public class QueryInitResponse extends QueryResponse { - public QueryInitResponse(long tookNanos, byte[] cursor, String data) { + public QueryInitResponse(long tookNanos, String cursor, String data) { super(tookNanos, cursor, data); } diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequest.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequest.java index 5e8b45c3ac7..e30417bc58e 100644 --- a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequest.java +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequest.java @@ -13,7 +13,7 @@ import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput; import java.io.IOException; public class QueryPageRequest extends AbstractQueryPageRequest { - public QueryPageRequest(byte[] cursor, TimeoutInfo timeout) { + public QueryPageRequest(String cursor, TimeoutInfo timeout) { super(cursor, timeout); } diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponse.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponse.java index eaa696218f5..5bce5284460 100644 --- a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponse.java +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponse.java @@ -13,7 +13,7 @@ import java.io.DataInput; import java.io.IOException; public class QueryPageResponse extends QueryResponse { - public QueryPageResponse(long tookNanos, byte[] cursor, String data) { + public QueryPageResponse(long tookNanos, String cursor, String data) { super(tookNanos, cursor, data); } diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryResponse.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryResponse.java index 581af4ab69b..1cd48f212d0 100644 --- a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryResponse.java +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryResponse.java @@ -16,7 +16,7 @@ import java.util.Objects; public abstract class QueryResponse extends AbstractQueryResponse { public final String data; - protected QueryResponse(long tookNanos, byte[] cursor, String data) { + protected QueryResponse(long tookNanos, String cursor, String data) { super(tookNanos, cursor); if (data == null) { throw new IllegalArgumentException("data cannot be null"); diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponseTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponseTests.java index 54d67faa0a6..8facca8b723 100644 --- a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponseTests.java +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponseTests.java @@ -14,8 +14,7 @@ import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequestTests public class QueryInitResponseTests extends ESTestCase { static QueryInitResponse randomQueryInitResponse() { - byte[] cursor = new byte[between(0, 5)]; - random().nextBytes(cursor); + String cursor = randomAlphaOfLength(10); return new QueryInitResponse(randomNonNegativeLong(), cursor, randomAlphaOfLength(5)); } @@ -25,6 +24,6 @@ public class QueryInitResponseTests extends ESTestCase { public void testToString() { assertEquals("QueryInitResponse", - new QueryInitResponse(123, new byte[] {0x01, 0x03}, "test").toString()); + new QueryInitResponse(123, "0103", "test").toString()); } } diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequestTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequestTests.java index 0dc3112857c..8680b848046 100644 --- a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequestTests.java +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequestTests.java @@ -15,8 +15,7 @@ import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils public class QueryPageRequestTests extends ESTestCase { static QueryPageRequest randomQueryPageRequest() { - byte[] cursor = new byte[between(0, 5)]; - random().nextBytes(cursor); + String cursor = randomAlphaOfLength(10); return new QueryPageRequest(cursor, randomTimeoutInfo()); } @@ -25,6 +24,6 @@ public class QueryPageRequestTests extends ESTestCase { } public void testToString() { - assertEquals("QueryPageRequest<0320>", new QueryPageRequest(new byte[] {0x03, 0x20}, new TimeoutInfo(1, 1, 1)).toString()); + assertEquals("QueryPageRequest<0320>", new QueryPageRequest("0320", new TimeoutInfo(1, 1, 1)).toString()); } } diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponseTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponseTests.java index b6e278ff192..bd7413b6eda 100644 --- a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponseTests.java +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponseTests.java @@ -14,8 +14,7 @@ import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequestTests public class QueryPageResponseTests extends ESTestCase { static QueryPageResponse randomQueryPageResponse() { - byte[] cursor = new byte[between(0, 5)]; - random().nextBytes(cursor); + String cursor = randomAlphaOfLength(10); return new QueryPageResponse(randomNonNegativeLong(), cursor, randomAlphaOfLength(5)); } @@ -25,6 +24,6 @@ public class QueryPageResponseTests extends ESTestCase { public void testToString() { assertEquals("QueryPageResponse", - new QueryPageResponse(123, new byte[] {0x01, 0x03}, "test").toString()); + new QueryPageResponse(123, "0103", "test").toString()); } } diff --git a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java index 1fb37e43be8..c187e7a7424 100644 --- a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java +++ b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java @@ -309,7 +309,7 @@ public class Cli { while (true) { term.writer().print(ResponseToString.toAnsi(response).toAnsi(term)); term.writer().flush(); - if (response.cursor().length == 0) { + if (response.cursor().isEmpty()) { // Successfully finished the entire query! return; } diff --git a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java index 46bd57095e6..4e8d6d5766d 100644 --- a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java +++ b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java @@ -42,7 +42,7 @@ public class CliHttpClient { return (QueryResponse) post(request); } - public QueryResponse nextPage(byte[] cursor) throws SQLException { + public QueryResponse nextPage(String cursor) throws SQLException { QueryPageRequest request = new QueryPageRequest(cursor, timeout()); return (QueryResponse) post(request); } diff --git a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/ResponseToStringTests.java b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/ResponseToStringTests.java index 4577953432c..fa065cc3a53 100644 --- a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/ResponseToStringTests.java +++ b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/ResponseToStringTests.java @@ -16,13 +16,13 @@ import static org.mockito.Mockito.when; public class ResponseToStringTests extends ESTestCase { public void testQueryInitResponse() { - AttributedStringBuilder s = ResponseToString.toAnsi(new QueryInitResponse(123, new byte[0], "some command response")); + AttributedStringBuilder s = ResponseToString.toAnsi(new QueryInitResponse(123, "", "some command response")); assertEquals("some command response", unstyled(s)); assertEquals("[37msome command response[0m", fullyStyled(s)); } public void testQueryPageResponse() { - AttributedStringBuilder s = ResponseToString.toAnsi(new QueryPageResponse(123, new byte[0], "some command response")); + AttributedStringBuilder s = ResponseToString.toAnsi(new QueryPageResponse(123, "", "some command response")); assertEquals("some command response", unstyled(s)); assertEquals("[37msome command response[0m", fullyStyled(s)); } diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponse.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponse.java index 0d98156fab7..b21715f88be 100644 --- a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponse.java +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponse.java @@ -23,7 +23,7 @@ public class QueryInitResponse extends AbstractQueryResponse { public final List columns; public final Payload data; - public QueryInitResponse(long tookNanos, byte[] cursor, List columns, Payload data) { + public QueryInitResponse(long tookNanos, String cursor, List columns, Payload data) { super(tookNanos, cursor); this.columns = columns; this.data = data; diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequest.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequest.java index 16ece0cb961..9d1a9b56488 100644 --- a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequest.java +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequest.java @@ -16,7 +16,7 @@ import java.io.IOException; public class QueryPageRequest extends AbstractQueryPageRequest { private final transient Payload data; - public QueryPageRequest(byte[] cursor, TimeoutInfo timeout, @Nullable Payload data) { + public QueryPageRequest(String cursor, TimeoutInfo timeout, @Nullable Payload data) { super(cursor, timeout); this.data = data; } diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponse.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponse.java index 9e7040ba41d..27b5ba8c1db 100644 --- a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponse.java +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponse.java @@ -18,7 +18,7 @@ import java.util.Objects; public class QueryPageResponse extends AbstractQueryResponse { private final Payload data; - public QueryPageResponse(long tookNanos, byte[] cursor, Payload data) { + public QueryPageResponse(long tookNanos, String cursor, Payload data) { super(tookNanos, cursor); this.data = data; } diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponseTests.java b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponseTests.java index f819d2dcd37..0a83ea15c41 100644 --- a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponseTests.java +++ b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponseTests.java @@ -16,8 +16,7 @@ import static org.elasticsearch.xpack.sql.jdbc.net.protocol.PageTests.randomPage public class QueryInitResponseTests extends ESTestCase { static QueryInitResponse randomQueryInitResponse() { - byte[] cursor = new byte[between(0, 5)]; - random().nextBytes(cursor); + String cursor = randomAlphaOfLength(10); Page page = randomPage(); return new QueryInitResponse(randomNonNegativeLong(), cursor, page.columnInfo(), page); } @@ -33,6 +32,6 @@ public class QueryInitResponseTests extends ESTestCase { }); assertEquals("QueryInitResponse] data=[" + "\ntest\nstring\n]>", - new QueryInitResponse(123, new byte[] {0x01, 0x20}, page.columnInfo(), page).toString()); + new QueryInitResponse(123, "0120", page.columnInfo(), page).toString()); } } diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequestTests.java b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequestTests.java index 0f4a33dbe09..189398b82d9 100644 --- a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequestTests.java +++ b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequestTests.java @@ -15,8 +15,7 @@ import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUti public class QueryPageRequestTests extends ESTestCase { static QueryPageRequest randomQueryPageRequest(Page page) { - byte[] cursor = new byte[between(0, 5)]; - random().nextBytes(cursor); + String cursor = randomAlphaOfLength(10); return new QueryPageRequest(cursor, randomTimeoutInfo(), page); } @@ -25,6 +24,6 @@ public class QueryPageRequestTests extends ESTestCase { } public void testToString() { - assertEquals("QueryPageRequest<0320>", new QueryPageRequest(new byte[] {0x03, 0x20}, new TimeoutInfo(1, 1, 1), null).toString()); + assertEquals("QueryPageRequest<0320>", new QueryPageRequest("0320", new TimeoutInfo(1, 1, 1), null).toString()); } } diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponseTests.java b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponseTests.java index 538e44c1b97..1e8f38ba97c 100644 --- a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponseTests.java +++ b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponseTests.java @@ -17,8 +17,7 @@ import static org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequestTest public class QueryPageResponseTests extends ESTestCase { static QueryPageResponse randomQueryPageResponse(Page page) { - byte[] cursor = new byte[between(0, 5)]; - random().nextBytes(cursor); + String cursor = randomAlphaOfLength(10); return new QueryPageResponse(randomNonNegativeLong(), cursor, page); } @@ -32,6 +31,6 @@ public class QueryPageResponseTests extends ESTestCase { new Object[] {"test"} }); assertEquals("QueryPageResponse", - new QueryPageResponse(123, new byte[] {0x08, 0x10}, results).toString()); + new QueryPageResponse(123, "0810", results).toString()); } } diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java index de279346c06..02cc8aa8d9c 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java @@ -18,9 +18,9 @@ class DefaultCursor implements Cursor { private final Page page; private int row = -1; - private byte[] cursor; + private String cursor; - DefaultCursor(JdbcHttpClient client, byte[] cursor, Page page, RequestMeta meta) { + DefaultCursor(JdbcHttpClient client, String cursor, Page page, RequestMeta meta) { this.client = client; this.meta = meta; this.cursor = cursor; @@ -39,7 +39,7 @@ class DefaultCursor implements Cursor { return true; } else { - if (cursor.length != 0) { + if (cursor.isEmpty() == false) { cursor = client.nextPage(cursor, page, meta); row = -1; return next(); diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java index aebb7ecabce..13638db33cc 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java @@ -64,7 +64,7 @@ public class JdbcHttpClient { * Read the next page of results, updating the {@link Page} and returning * the scroll id to use to fetch the next page. */ - public byte[] nextPage(byte[] cursor, Page page, RequestMeta meta) throws SQLException { + public String nextPage(String cursor, Page page, RequestMeta meta) throws SQLException { QueryPageRequest request = new QueryPageRequest(cursor, timeout(meta), page); return ((QueryPageResponse) http.post(request)).cursor(); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatterCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatterCursor.java index bc0d487727a..1673da80636 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatterCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatterCursor.java @@ -24,6 +24,13 @@ public class CliFormatterCursor implements Cursor { private Cursor delegate; private CliFormatter formatter; + public static Cursor wrap(Cursor newCursor, CliFormatter formatter) { + if (newCursor == EMPTY) { + return EMPTY; + } + return new CliFormatterCursor(newCursor, formatter); + } + public CliFormatterCursor(Cursor delegate, CliFormatter formatter) { this.delegate = delegate; this.formatter = formatter; @@ -39,6 +46,7 @@ public class CliFormatterCursor implements Cursor { out.writeNamedWriteable(delegate); formatter.writeTo(out); } + public CliFormatter getCliFormatter() { return formatter; } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/JdbcCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/JdbcCursor.java new file mode 100644 index 00000000000..cdb370fa291 --- /dev/null +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/JdbcCursor.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.plugin; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.sql.session.Configuration; +import org.elasticsearch.xpack.sql.session.Cursor; +import org.elasticsearch.xpack.sql.session.RowSet; + +import java.io.IOException; +import java.sql.JDBCType; +import java.util.ArrayList; +import java.util.List; + +/** + * The cursor that wraps all necessary information for jdbc + */ +public class JdbcCursor implements Cursor { + public static final String NAME = "j"; + private Cursor delegate; + private List types; + + + public static Cursor wrap(Cursor newCursor, List types) { + if (newCursor == EMPTY) { + return EMPTY; + } + return new JdbcCursor(newCursor, types); + } + + public JdbcCursor(Cursor delegate, List types) { + this.delegate = delegate; + this.types = types; + } + + public JdbcCursor(StreamInput in) throws IOException { + delegate = in.readNamedWriteable(Cursor.class); + int size = in.readVInt(); + types = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + types.add(JDBCType.valueOf(in.readVInt())); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(delegate); + out.writeVInt(types.size()); + for (JDBCType type : types) { + out.writeVInt(type.getVendorTypeNumber()); + } + } + + public List getTypes() { + return types; + } + + @Override + public void nextPage(Configuration cfg, Client client, ActionListener listener) { + delegate.nextPage(cfg, client, listener); + } + + @Override + public String getWriteableName() { + return NAME; + } +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java index 39d653f9839..26d162ddad3 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java @@ -5,14 +5,10 @@ */ package org.elasticsearch.xpack.sql.plugin; -import org.apache.lucene.util.BytesRef; +import org.elasticsearch.Version; import org.elasticsearch.action.main.MainAction; import org.elasticsearch.action.main.MainRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.RestChannel; @@ -83,19 +79,17 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction { return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> { CliFormatter formatter = new CliFormatter(response); String data = formatter.formatWithHeader(response); - return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data); + return new QueryInitResponse(System.nanoTime() - start, + Cursor.encodeToString(Version.CURRENT, CliFormatterCursor.wrap(response.cursor(), formatter)), data); })); } private Consumer queryPage(Client client, QueryPageRequest request) { - Cursor cursor; - CliFormatter formatter; - try (StreamInput in = new NamedWriteableAwareStreamInput(new BytesArray(request.cursor).streamInput(), CURSOR_REGISTRY)) { - cursor = in.readNamedWriteable(Cursor.class); - formatter = new CliFormatter(in); - } catch (IOException e) { - throw new IllegalArgumentException("error reading the cursor"); + Cursor cursor = Cursor.decodeFromString(request.cursor); + if (cursor instanceof CliFormatterCursor == false) { + throw new IllegalArgumentException("Unexpected cursor type: [" + cursor + "]"); } + CliFormatter formatter = ((CliFormatterCursor)cursor).getCliFormatter(); SqlRequest sqlRequest = new SqlRequest("", null, SqlRequest.DEFAULT_TIME_ZONE, 0, TimeValue.timeValueMillis(request.timeout.requestTimeout), TimeValue.timeValueMillis(request.timeout.pageTimeout), @@ -104,20 +98,8 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction { long start = System.nanoTime(); return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> { String data = formatter.formatWithoutHeader(response); - return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data); + return new QueryPageResponse(System.nanoTime() - start, + Cursor.encodeToString(Version.CURRENT, CliFormatterCursor.wrap(response.cursor(), formatter)), data); })); } - - private static byte[] serializeCursor(Cursor cursor, CliFormatter formatter) { - if (cursor == Cursor.EMPTY) { - return new byte[0]; - } - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.writeNamedWriteable(cursor); - formatter.writeTo(out); - return BytesRef.deepCopyOf(out.bytes().toBytesRef()).bytes; - } catch (IOException e) { - throw new RuntimeException("unexpected trouble building the cursor", e); - } - } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java index 82ade18a019..56d3e979b36 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java @@ -5,16 +5,12 @@ */ package org.elasticsearch.xpack.sql.plugin; -import org.apache.lucene.util.BytesRef; +import org.elasticsearch.Version; import org.elasticsearch.action.main.MainAction; import org.elasticsearch.action.main.MainRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.RestChannel; @@ -163,7 +159,6 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { } private Consumer queryInit(Client client, QueryInitRequest request) { - SqlRequest sqlRequest = new SqlRequest(request.query, null, DateTimeZone.forTimeZone(request.timeZone), request.fetchSize, TimeValue.timeValueMillis(request.timeout.requestTimeout), TimeValue.timeValueMillis(request.timeout.pageTimeout), @@ -176,20 +171,18 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { types.add(info.jdbcType()); columns.add(new ColumnInfo(info.name(), info.jdbcType(), EMPTY, EMPTY, EMPTY, EMPTY, info.displaySize())); } - return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types), columns, + return new QueryInitResponse(System.nanoTime() - start, + Cursor.encodeToString(Version.CURRENT, JdbcCursor.wrap(response.cursor(), types)), columns, new SqlResponsePayload(types, response.rows())); })); } private Consumer queryPage(Client client, QueryPageRequest request) { - Cursor cursor; - List types; - try (StreamInput in = new NamedWriteableAwareStreamInput(new BytesArray(request.cursor).streamInput(), CURSOR_REGISTRY)) { - cursor = in.readNamedWriteable(Cursor.class); - types = in.readList(r -> JDBCType.valueOf(r.readVInt())); - } catch (IOException e) { - throw new IllegalArgumentException("error reading the cursor"); + Cursor cursor = Cursor.decodeFromString(request.cursor); + if (cursor instanceof JdbcCursor == false) { + throw new IllegalArgumentException("Unexpected cursor type: [" + cursor + "]"); } + List types = ((JdbcCursor)cursor).getTypes(); // NB: the timezone and page size are locked already by the query so pass in defaults (as they are not read anyway) SqlRequest sqlRequest = new SqlRequest(EMPTY, null, SqlRequest.DEFAULT_TIME_ZONE, 0, TimeValue.timeValueMillis(request.timeout.requestTimeout), @@ -197,23 +190,8 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { cursor); long start = System.nanoTime(); return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, - response -> new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types), + response -> new QueryPageResponse(System.nanoTime() - start, + Cursor.encodeToString(Version.CURRENT, JdbcCursor.wrap(response.cursor(), types)), new SqlResponsePayload(types, response.rows())))); } - - private static byte[] serializeCursor(Cursor cursor, List types) { - if (cursor == Cursor.EMPTY) { - return new byte[0]; - } - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.writeNamedWriteable(cursor); - out.writeVInt(types.size()); - for (JDBCType type : types) { - out.writeVInt(type.getVendorTypeNumber()); - } - return BytesRef.deepCopyOf(out.bytes().toBytesRef()).bytes; - } catch (IOException e) { - throw new RuntimeException("unexpected trouble building the cursor", e); - } - } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java index 1e331997b68..9418dbb66af 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java @@ -66,13 +66,7 @@ public class RestSqlAction extends BaseRestHandler { data = formatter.formatWithHeader(response); } - final Cursor responseCursor; - if (response.cursor() == Cursor.EMPTY) { - responseCursor = Cursor.EMPTY; - } else { - responseCursor = new CliFormatterCursor(response.cursor(), formatter); - } - return buildTextResponse(responseCursor, System.nanoTime() - startNanos, data); + return buildTextResponse(CliFormatterCursor.wrap(response.cursor(), formatter), System.nanoTime() - startNanos, data); } }); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java index 4a438e7aeec..b13ccaf570a 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.sql.execution.search.ScrollCursor; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors; import org.elasticsearch.xpack.sql.plugin.CliFormatterCursor; +import org.elasticsearch.xpack.sql.plugin.JdbcCursor; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -50,6 +51,7 @@ public interface Cursor extends NamedWriteable { entries.add(new NamedWriteableRegistry.Entry(Cursor.class, EmptyCursor.NAME, in -> EMPTY)); entries.add(new NamedWriteableRegistry.Entry(Cursor.class, ScrollCursor.NAME, ScrollCursor::new)); entries.add(new NamedWriteableRegistry.Entry(Cursor.class, CliFormatterCursor.NAME, CliFormatterCursor::new)); + entries.add(new NamedWriteableRegistry.Entry(Cursor.class, JdbcCursor.NAME, JdbcCursor::new)); return entries; } @@ -57,6 +59,9 @@ public interface Cursor extends NamedWriteable { * Write a {@linkplain Cursor} to a string for serialization across xcontent. */ static String encodeToString(Version version, Cursor info) { + if(info == EMPTY) { + return ""; + } try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { try (OutputStream base64 = Base64.getEncoder().wrap(os); StreamOutput out = new OutputStreamStreamOutput(base64)) { @@ -74,6 +79,9 @@ public interface Cursor extends NamedWriteable { * Read a {@linkplain Cursor} from a string. */ static Cursor decodeFromString(String info) { + if (info.isEmpty()) { + return EMPTY; + } byte[] bytes = info.getBytes(StandardCharsets.UTF_8); try (StreamInput delegate = new InputStreamStreamInput(Base64.getDecoder().wrap(new ByteArrayInputStream(bytes))); StreamInput in = new NamedWriteableAwareStreamInput(delegate, CURSOR_REGISTRY)) { diff --git a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryPageRequest.java b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryPageRequest.java index f939ccb0fd8..cf4ce9c2968 100644 --- a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryPageRequest.java +++ b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryPageRequest.java @@ -6,15 +6,13 @@ package org.elasticsearch.xpack.sql.protocol.shared; import java.io.IOException; -import java.util.Arrays; -import java.util.Locale; import java.util.Objects; public abstract class AbstractQueryPageRequest extends Request { - public final byte[] cursor; + public final String cursor; public final TimeoutInfo timeout; - protected AbstractQueryPageRequest(byte[] cursor, TimeoutInfo timeout) { + protected AbstractQueryPageRequest(String cursor, TimeoutInfo timeout) { if (cursor == null) { throw new IllegalArgumentException("[cursor] must not be null"); } @@ -26,25 +24,19 @@ public abstract class AbstractQueryPageRequest extends Request { } protected AbstractQueryPageRequest(SqlDataInput in) throws IOException { - this.cursor = new byte[ProtoUtil.readArraySize(in)]; - in.readFully(cursor); + this.cursor = in.readUTF(); this.timeout = new TimeoutInfo(in); } @Override public void writeTo(SqlDataOutput out) throws IOException { - out.writeInt(cursor.length); - out.write(cursor); + out.writeUTF(cursor); timeout.writeTo(out); } @Override protected String toStringBody() { - StringBuilder b = new StringBuilder(); - for (int i = 0; i < cursor.length; i++) { - b.append(String.format(Locale.ROOT, "%02x", cursor[i])); - } - return b.toString(); + return cursor; } @Override @@ -53,7 +45,7 @@ public abstract class AbstractQueryPageRequest extends Request { return false; } AbstractQueryPageRequest other = (AbstractQueryPageRequest) obj; - return Arrays.equals(cursor, other.cursor) + return Objects.equals(cursor, other.cursor) && timeout.equals(other.timeout); } diff --git a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryResponse.java b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryResponse.java index 126e8dd4641..f1ba90400dc 100644 --- a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryResponse.java +++ b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryResponse.java @@ -7,8 +7,6 @@ package org.elasticsearch.xpack.sql.protocol.shared; import java.io.DataInput; import java.io.IOException; -import java.util.Arrays; -import java.util.Locale; import java.util.Objects; /** @@ -17,9 +15,9 @@ import java.util.Objects; */ public abstract class AbstractQueryResponse extends Response { private final long tookNanos; - private final byte[] cursor; + private final String cursor; - protected AbstractQueryResponse(long tookNanos, byte[] cursor) { + protected AbstractQueryResponse(long tookNanos, String cursor) { if (cursor == null) { throw new IllegalArgumentException("cursor must not be null"); } @@ -29,15 +27,13 @@ public abstract class AbstractQueryResponse extends Response { protected AbstractQueryResponse(Request request, DataInput in) throws IOException { tookNanos = in.readLong(); - cursor = new byte[ProtoUtil.readArraySize(in)]; - in.readFully(cursor); + cursor = in.readUTF(); } @Override protected void writeTo(SqlDataOutput out) throws IOException { out.writeLong(tookNanos); - out.writeInt(cursor.length); - out.write(cursor); + out.writeUTF(cursor); } /** @@ -52,7 +48,7 @@ public abstract class AbstractQueryResponse extends Response { * Cursor for fetching the next page. If it has {@code length = 0} * then there is no next page. */ - public byte[] cursor() { + public String cursor() { return cursor; } @@ -61,9 +57,7 @@ public abstract class AbstractQueryResponse extends Response { StringBuilder b = new StringBuilder(); b.append("tookNanos=[").append(tookNanos); b.append("] cursor=["); - for (int i = 0; i < cursor.length; i++) { - b.append(String.format(Locale.ROOT, "%02x", cursor[i])); - } + b.append(cursor); b.append("]"); return b.toString(); } @@ -75,11 +69,11 @@ public abstract class AbstractQueryResponse extends Response { } AbstractQueryResponse other = (AbstractQueryResponse) obj; return tookNanos == other.tookNanos - && Arrays.equals(cursor, other.cursor); + && Objects.equals(cursor, other.cursor); } @Override public int hashCode() { - return Objects.hash(tookNanos, Arrays.hashCode(cursor)); + return Objects.hash(tookNanos, cursor); } }