From 773cdf0f9f8166e1609ca9fd64a866f39384652d Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Sat, 2 Dec 2017 16:03:22 -0500 Subject: [PATCH] SQL: Switch to the standard way of cursor serialization (elastic/x-pack-elasticsearch#3197) While working on cursor cleanup, I realized that we still have two ways to serialize the cursor and the second way doesn't contain the cursor version (only client version, that can be potentially different from the cursor version). This commit switches to the unified way of serializing the cursor. This is a follow up for elastic/x-pack-elasticsearch#3064. Original commit: elastic/x-pack-elasticsearch@ef1a6427dda0416ff88e37ba57ae6ac71f9ae949 --- .../cli/net/protocol/QueryInitResponse.java | 2 +- .../cli/net/protocol/QueryPageRequest.java | 2 +- .../cli/net/protocol/QueryPageResponse.java | 2 +- .../sql/cli/net/protocol/QueryResponse.java | 2 +- .../net/protocol/QueryInitResponseTests.java | 5 +- .../net/protocol/QueryPageRequestTests.java | 5 +- .../net/protocol/QueryPageResponseTests.java | 5 +- .../org/elasticsearch/xpack/sql/cli/Cli.java | 2 +- .../xpack/sql/cli/CliHttpClient.java | 2 +- .../xpack/sql/cli/ResponseToStringTests.java | 4 +- .../jdbc/net/protocol/QueryInitResponse.java | 2 +- .../jdbc/net/protocol/QueryPageRequest.java | 2 +- .../jdbc/net/protocol/QueryPageResponse.java | 2 +- .../net/protocol/QueryInitResponseTests.java | 5 +- .../net/protocol/QueryPageRequestTests.java | 5 +- .../net/protocol/QueryPageResponseTests.java | 5 +- .../sql/jdbc/net/client/DefaultCursor.java | 6 +- .../sql/jdbc/net/client/JdbcHttpClient.java | 2 +- .../xpack/sql/plugin/CliFormatterCursor.java | 8 ++ .../xpack/sql/plugin/JdbcCursor.java | 73 +++++++++++++++++++ .../xpack/sql/plugin/RestSqlCliAction.java | 36 +++------ .../xpack/sql/plugin/RestSqlJdbcAction.java | 40 +++------- .../sql/plugin/sql/rest/RestSqlAction.java | 8 +- .../xpack/sql/session/Cursor.java | 8 ++ .../shared/AbstractQueryPageRequest.java | 20 ++--- .../shared/AbstractQueryResponse.java | 22 ++---- 26 files changed, 149 insertions(+), 126 deletions(-) create mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/JdbcCursor.java diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponse.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponse.java index 2559365e8a7..18cde69a15a 100644 --- a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponse.java +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponse.java @@ -13,7 +13,7 @@ import java.io.DataInput; import java.io.IOException; public class QueryInitResponse extends QueryResponse { - public QueryInitResponse(long tookNanos, byte[] cursor, String data) { + public QueryInitResponse(long tookNanos, String cursor, String data) { super(tookNanos, cursor, data); } diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequest.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequest.java index 5e8b45c3ac7..e30417bc58e 100644 --- a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequest.java +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequest.java @@ -13,7 +13,7 @@ import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput; import java.io.IOException; public class QueryPageRequest extends AbstractQueryPageRequest { - public QueryPageRequest(byte[] cursor, TimeoutInfo timeout) { + public QueryPageRequest(String cursor, TimeoutInfo timeout) { super(cursor, timeout); } diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponse.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponse.java index eaa696218f5..5bce5284460 100644 --- a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponse.java +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponse.java @@ -13,7 +13,7 @@ import java.io.DataInput; import java.io.IOException; public class QueryPageResponse extends QueryResponse { - public QueryPageResponse(long tookNanos, byte[] cursor, String data) { + public QueryPageResponse(long tookNanos, String cursor, String data) { super(tookNanos, cursor, data); } diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryResponse.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryResponse.java index 581af4ab69b..1cd48f212d0 100644 --- a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryResponse.java +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryResponse.java @@ -16,7 +16,7 @@ import java.util.Objects; public abstract class QueryResponse extends AbstractQueryResponse { public final String data; - protected QueryResponse(long tookNanos, byte[] cursor, String data) { + protected QueryResponse(long tookNanos, String cursor, String data) { super(tookNanos, cursor); if (data == null) { throw new IllegalArgumentException("data cannot be null"); diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponseTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponseTests.java index 54d67faa0a6..8facca8b723 100644 --- a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponseTests.java +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryInitResponseTests.java @@ -14,8 +14,7 @@ import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequestTests public class QueryInitResponseTests extends ESTestCase { static QueryInitResponse randomQueryInitResponse() { - byte[] cursor = new byte[between(0, 5)]; - random().nextBytes(cursor); + String cursor = randomAlphaOfLength(10); return new QueryInitResponse(randomNonNegativeLong(), cursor, randomAlphaOfLength(5)); } @@ -25,6 +24,6 @@ public class QueryInitResponseTests extends ESTestCase { public void testToString() { assertEquals("QueryInitResponse", - new QueryInitResponse(123, new byte[] {0x01, 0x03}, "test").toString()); + new QueryInitResponse(123, "0103", "test").toString()); } } diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequestTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequestTests.java index 0dc3112857c..8680b848046 100644 --- a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequestTests.java +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageRequestTests.java @@ -15,8 +15,7 @@ import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils public class QueryPageRequestTests extends ESTestCase { static QueryPageRequest randomQueryPageRequest() { - byte[] cursor = new byte[between(0, 5)]; - random().nextBytes(cursor); + String cursor = randomAlphaOfLength(10); return new QueryPageRequest(cursor, randomTimeoutInfo()); } @@ -25,6 +24,6 @@ public class QueryPageRequestTests extends ESTestCase { } public void testToString() { - assertEquals("QueryPageRequest<0320>", new QueryPageRequest(new byte[] {0x03, 0x20}, new TimeoutInfo(1, 1, 1)).toString()); + assertEquals("QueryPageRequest<0320>", new QueryPageRequest("0320", new TimeoutInfo(1, 1, 1)).toString()); } } diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponseTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponseTests.java index b6e278ff192..bd7413b6eda 100644 --- a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponseTests.java +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryPageResponseTests.java @@ -14,8 +14,7 @@ import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequestTests public class QueryPageResponseTests extends ESTestCase { static QueryPageResponse randomQueryPageResponse() { - byte[] cursor = new byte[between(0, 5)]; - random().nextBytes(cursor); + String cursor = randomAlphaOfLength(10); return new QueryPageResponse(randomNonNegativeLong(), cursor, randomAlphaOfLength(5)); } @@ -25,6 +24,6 @@ public class QueryPageResponseTests extends ESTestCase { public void testToString() { assertEquals("QueryPageResponse", - new QueryPageResponse(123, new byte[] {0x01, 0x03}, "test").toString()); + new QueryPageResponse(123, "0103", "test").toString()); } } diff --git a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java index 1fb37e43be8..c187e7a7424 100644 --- a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java +++ b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java @@ -309,7 +309,7 @@ public class Cli { while (true) { term.writer().print(ResponseToString.toAnsi(response).toAnsi(term)); term.writer().flush(); - if (response.cursor().length == 0) { + if (response.cursor().isEmpty()) { // Successfully finished the entire query! return; } diff --git a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java index 46bd57095e6..4e8d6d5766d 100644 --- a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java +++ b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java @@ -42,7 +42,7 @@ public class CliHttpClient { return (QueryResponse) post(request); } - public QueryResponse nextPage(byte[] cursor) throws SQLException { + public QueryResponse nextPage(String cursor) throws SQLException { QueryPageRequest request = new QueryPageRequest(cursor, timeout()); return (QueryResponse) post(request); } diff --git a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/ResponseToStringTests.java b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/ResponseToStringTests.java index 4577953432c..fa065cc3a53 100644 --- a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/ResponseToStringTests.java +++ b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/ResponseToStringTests.java @@ -16,13 +16,13 @@ import static org.mockito.Mockito.when; public class ResponseToStringTests extends ESTestCase { public void testQueryInitResponse() { - AttributedStringBuilder s = ResponseToString.toAnsi(new QueryInitResponse(123, new byte[0], "some command response")); + AttributedStringBuilder s = ResponseToString.toAnsi(new QueryInitResponse(123, "", "some command response")); assertEquals("some command response", unstyled(s)); assertEquals("[37msome command response[0m", fullyStyled(s)); } public void testQueryPageResponse() { - AttributedStringBuilder s = ResponseToString.toAnsi(new QueryPageResponse(123, new byte[0], "some command response")); + AttributedStringBuilder s = ResponseToString.toAnsi(new QueryPageResponse(123, "", "some command response")); assertEquals("some command response", unstyled(s)); assertEquals("[37msome command response[0m", fullyStyled(s)); } diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponse.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponse.java index 0d98156fab7..b21715f88be 100644 --- a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponse.java +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponse.java @@ -23,7 +23,7 @@ public class QueryInitResponse extends AbstractQueryResponse { public final List columns; public final Payload data; - public QueryInitResponse(long tookNanos, byte[] cursor, List columns, Payload data) { + public QueryInitResponse(long tookNanos, String cursor, List columns, Payload data) { super(tookNanos, cursor); this.columns = columns; this.data = data; diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequest.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequest.java index 16ece0cb961..9d1a9b56488 100644 --- a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequest.java +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequest.java @@ -16,7 +16,7 @@ import java.io.IOException; public class QueryPageRequest extends AbstractQueryPageRequest { private final transient Payload data; - public QueryPageRequest(byte[] cursor, TimeoutInfo timeout, @Nullable Payload data) { + public QueryPageRequest(String cursor, TimeoutInfo timeout, @Nullable Payload data) { super(cursor, timeout); this.data = data; } diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponse.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponse.java index 9e7040ba41d..27b5ba8c1db 100644 --- a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponse.java +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponse.java @@ -18,7 +18,7 @@ import java.util.Objects; public class QueryPageResponse extends AbstractQueryResponse { private final Payload data; - public QueryPageResponse(long tookNanos, byte[] cursor, Payload data) { + public QueryPageResponse(long tookNanos, String cursor, Payload data) { super(tookNanos, cursor); this.data = data; } diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponseTests.java b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponseTests.java index f819d2dcd37..0a83ea15c41 100644 --- a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponseTests.java +++ b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryInitResponseTests.java @@ -16,8 +16,7 @@ import static org.elasticsearch.xpack.sql.jdbc.net.protocol.PageTests.randomPage public class QueryInitResponseTests extends ESTestCase { static QueryInitResponse randomQueryInitResponse() { - byte[] cursor = new byte[between(0, 5)]; - random().nextBytes(cursor); + String cursor = randomAlphaOfLength(10); Page page = randomPage(); return new QueryInitResponse(randomNonNegativeLong(), cursor, page.columnInfo(), page); } @@ -33,6 +32,6 @@ public class QueryInitResponseTests extends ESTestCase { }); assertEquals("QueryInitResponse] data=[" + "\ntest\nstring\n]>", - new QueryInitResponse(123, new byte[] {0x01, 0x20}, page.columnInfo(), page).toString()); + new QueryInitResponse(123, "0120", page.columnInfo(), page).toString()); } } diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequestTests.java b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequestTests.java index 0f4a33dbe09..189398b82d9 100644 --- a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequestTests.java +++ b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageRequestTests.java @@ -15,8 +15,7 @@ import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUti public class QueryPageRequestTests extends ESTestCase { static QueryPageRequest randomQueryPageRequest(Page page) { - byte[] cursor = new byte[between(0, 5)]; - random().nextBytes(cursor); + String cursor = randomAlphaOfLength(10); return new QueryPageRequest(cursor, randomTimeoutInfo(), page); } @@ -25,6 +24,6 @@ public class QueryPageRequestTests extends ESTestCase { } public void testToString() { - assertEquals("QueryPageRequest<0320>", new QueryPageRequest(new byte[] {0x03, 0x20}, new TimeoutInfo(1, 1, 1), null).toString()); + assertEquals("QueryPageRequest<0320>", new QueryPageRequest("0320", new TimeoutInfo(1, 1, 1), null).toString()); } } diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponseTests.java b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponseTests.java index 538e44c1b97..1e8f38ba97c 100644 --- a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponseTests.java +++ b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryPageResponseTests.java @@ -17,8 +17,7 @@ import static org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequestTest public class QueryPageResponseTests extends ESTestCase { static QueryPageResponse randomQueryPageResponse(Page page) { - byte[] cursor = new byte[between(0, 5)]; - random().nextBytes(cursor); + String cursor = randomAlphaOfLength(10); return new QueryPageResponse(randomNonNegativeLong(), cursor, page); } @@ -32,6 +31,6 @@ public class QueryPageResponseTests extends ESTestCase { new Object[] {"test"} }); assertEquals("QueryPageResponse", - new QueryPageResponse(123, new byte[] {0x08, 0x10}, results).toString()); + new QueryPageResponse(123, "0810", results).toString()); } } diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java index de279346c06..02cc8aa8d9c 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java @@ -18,9 +18,9 @@ class DefaultCursor implements Cursor { private final Page page; private int row = -1; - private byte[] cursor; + private String cursor; - DefaultCursor(JdbcHttpClient client, byte[] cursor, Page page, RequestMeta meta) { + DefaultCursor(JdbcHttpClient client, String cursor, Page page, RequestMeta meta) { this.client = client; this.meta = meta; this.cursor = cursor; @@ -39,7 +39,7 @@ class DefaultCursor implements Cursor { return true; } else { - if (cursor.length != 0) { + if (cursor.isEmpty() == false) { cursor = client.nextPage(cursor, page, meta); row = -1; return next(); diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java index aebb7ecabce..13638db33cc 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java @@ -64,7 +64,7 @@ public class JdbcHttpClient { * Read the next page of results, updating the {@link Page} and returning * the scroll id to use to fetch the next page. */ - public byte[] nextPage(byte[] cursor, Page page, RequestMeta meta) throws SQLException { + public String nextPage(String cursor, Page page, RequestMeta meta) throws SQLException { QueryPageRequest request = new QueryPageRequest(cursor, timeout(meta), page); return ((QueryPageResponse) http.post(request)).cursor(); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatterCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatterCursor.java index bc0d487727a..1673da80636 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatterCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatterCursor.java @@ -24,6 +24,13 @@ public class CliFormatterCursor implements Cursor { private Cursor delegate; private CliFormatter formatter; + public static Cursor wrap(Cursor newCursor, CliFormatter formatter) { + if (newCursor == EMPTY) { + return EMPTY; + } + return new CliFormatterCursor(newCursor, formatter); + } + public CliFormatterCursor(Cursor delegate, CliFormatter formatter) { this.delegate = delegate; this.formatter = formatter; @@ -39,6 +46,7 @@ public class CliFormatterCursor implements Cursor { out.writeNamedWriteable(delegate); formatter.writeTo(out); } + public CliFormatter getCliFormatter() { return formatter; } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/JdbcCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/JdbcCursor.java new file mode 100644 index 00000000000..cdb370fa291 --- /dev/null +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/JdbcCursor.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.plugin; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.sql.session.Configuration; +import org.elasticsearch.xpack.sql.session.Cursor; +import org.elasticsearch.xpack.sql.session.RowSet; + +import java.io.IOException; +import java.sql.JDBCType; +import java.util.ArrayList; +import java.util.List; + +/** + * The cursor that wraps all necessary information for jdbc + */ +public class JdbcCursor implements Cursor { + public static final String NAME = "j"; + private Cursor delegate; + private List types; + + + public static Cursor wrap(Cursor newCursor, List types) { + if (newCursor == EMPTY) { + return EMPTY; + } + return new JdbcCursor(newCursor, types); + } + + public JdbcCursor(Cursor delegate, List types) { + this.delegate = delegate; + this.types = types; + } + + public JdbcCursor(StreamInput in) throws IOException { + delegate = in.readNamedWriteable(Cursor.class); + int size = in.readVInt(); + types = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + types.add(JDBCType.valueOf(in.readVInt())); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(delegate); + out.writeVInt(types.size()); + for (JDBCType type : types) { + out.writeVInt(type.getVendorTypeNumber()); + } + } + + public List getTypes() { + return types; + } + + @Override + public void nextPage(Configuration cfg, Client client, ActionListener listener) { + delegate.nextPage(cfg, client, listener); + } + + @Override + public String getWriteableName() { + return NAME; + } +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java index 39d653f9839..26d162ddad3 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java @@ -5,14 +5,10 @@ */ package org.elasticsearch.xpack.sql.plugin; -import org.apache.lucene.util.BytesRef; +import org.elasticsearch.Version; import org.elasticsearch.action.main.MainAction; import org.elasticsearch.action.main.MainRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.RestChannel; @@ -83,19 +79,17 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction { return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> { CliFormatter formatter = new CliFormatter(response); String data = formatter.formatWithHeader(response); - return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data); + return new QueryInitResponse(System.nanoTime() - start, + Cursor.encodeToString(Version.CURRENT, CliFormatterCursor.wrap(response.cursor(), formatter)), data); })); } private Consumer queryPage(Client client, QueryPageRequest request) { - Cursor cursor; - CliFormatter formatter; - try (StreamInput in = new NamedWriteableAwareStreamInput(new BytesArray(request.cursor).streamInput(), CURSOR_REGISTRY)) { - cursor = in.readNamedWriteable(Cursor.class); - formatter = new CliFormatter(in); - } catch (IOException e) { - throw new IllegalArgumentException("error reading the cursor"); + Cursor cursor = Cursor.decodeFromString(request.cursor); + if (cursor instanceof CliFormatterCursor == false) { + throw new IllegalArgumentException("Unexpected cursor type: [" + cursor + "]"); } + CliFormatter formatter = ((CliFormatterCursor)cursor).getCliFormatter(); SqlRequest sqlRequest = new SqlRequest("", null, SqlRequest.DEFAULT_TIME_ZONE, 0, TimeValue.timeValueMillis(request.timeout.requestTimeout), TimeValue.timeValueMillis(request.timeout.pageTimeout), @@ -104,20 +98,8 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction { long start = System.nanoTime(); return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> { String data = formatter.formatWithoutHeader(response); - return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data); + return new QueryPageResponse(System.nanoTime() - start, + Cursor.encodeToString(Version.CURRENT, CliFormatterCursor.wrap(response.cursor(), formatter)), data); })); } - - private static byte[] serializeCursor(Cursor cursor, CliFormatter formatter) { - if (cursor == Cursor.EMPTY) { - return new byte[0]; - } - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.writeNamedWriteable(cursor); - formatter.writeTo(out); - return BytesRef.deepCopyOf(out.bytes().toBytesRef()).bytes; - } catch (IOException e) { - throw new RuntimeException("unexpected trouble building the cursor", e); - } - } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java index 82ade18a019..56d3e979b36 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java @@ -5,16 +5,12 @@ */ package org.elasticsearch.xpack.sql.plugin; -import org.apache.lucene.util.BytesRef; +import org.elasticsearch.Version; import org.elasticsearch.action.main.MainAction; import org.elasticsearch.action.main.MainRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.RestChannel; @@ -163,7 +159,6 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { } private Consumer queryInit(Client client, QueryInitRequest request) { - SqlRequest sqlRequest = new SqlRequest(request.query, null, DateTimeZone.forTimeZone(request.timeZone), request.fetchSize, TimeValue.timeValueMillis(request.timeout.requestTimeout), TimeValue.timeValueMillis(request.timeout.pageTimeout), @@ -176,20 +171,18 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { types.add(info.jdbcType()); columns.add(new ColumnInfo(info.name(), info.jdbcType(), EMPTY, EMPTY, EMPTY, EMPTY, info.displaySize())); } - return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types), columns, + return new QueryInitResponse(System.nanoTime() - start, + Cursor.encodeToString(Version.CURRENT, JdbcCursor.wrap(response.cursor(), types)), columns, new SqlResponsePayload(types, response.rows())); })); } private Consumer queryPage(Client client, QueryPageRequest request) { - Cursor cursor; - List types; - try (StreamInput in = new NamedWriteableAwareStreamInput(new BytesArray(request.cursor).streamInput(), CURSOR_REGISTRY)) { - cursor = in.readNamedWriteable(Cursor.class); - types = in.readList(r -> JDBCType.valueOf(r.readVInt())); - } catch (IOException e) { - throw new IllegalArgumentException("error reading the cursor"); + Cursor cursor = Cursor.decodeFromString(request.cursor); + if (cursor instanceof JdbcCursor == false) { + throw new IllegalArgumentException("Unexpected cursor type: [" + cursor + "]"); } + List types = ((JdbcCursor)cursor).getTypes(); // NB: the timezone and page size are locked already by the query so pass in defaults (as they are not read anyway) SqlRequest sqlRequest = new SqlRequest(EMPTY, null, SqlRequest.DEFAULT_TIME_ZONE, 0, TimeValue.timeValueMillis(request.timeout.requestTimeout), @@ -197,23 +190,8 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { cursor); long start = System.nanoTime(); return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, - response -> new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types), + response -> new QueryPageResponse(System.nanoTime() - start, + Cursor.encodeToString(Version.CURRENT, JdbcCursor.wrap(response.cursor(), types)), new SqlResponsePayload(types, response.rows())))); } - - private static byte[] serializeCursor(Cursor cursor, List types) { - if (cursor == Cursor.EMPTY) { - return new byte[0]; - } - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.writeNamedWriteable(cursor); - out.writeVInt(types.size()); - for (JDBCType type : types) { - out.writeVInt(type.getVendorTypeNumber()); - } - return BytesRef.deepCopyOf(out.bytes().toBytesRef()).bytes; - } catch (IOException e) { - throw new RuntimeException("unexpected trouble building the cursor", e); - } - } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java index 1e331997b68..9418dbb66af 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/rest/RestSqlAction.java @@ -66,13 +66,7 @@ public class RestSqlAction extends BaseRestHandler { data = formatter.formatWithHeader(response); } - final Cursor responseCursor; - if (response.cursor() == Cursor.EMPTY) { - responseCursor = Cursor.EMPTY; - } else { - responseCursor = new CliFormatterCursor(response.cursor(), formatter); - } - return buildTextResponse(responseCursor, System.nanoTime() - startNanos, data); + return buildTextResponse(CliFormatterCursor.wrap(response.cursor(), formatter), System.nanoTime() - startNanos, data); } }); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java index 4a438e7aeec..b13ccaf570a 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.sql.execution.search.ScrollCursor; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors; import org.elasticsearch.xpack.sql.plugin.CliFormatterCursor; +import org.elasticsearch.xpack.sql.plugin.JdbcCursor; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -50,6 +51,7 @@ public interface Cursor extends NamedWriteable { entries.add(new NamedWriteableRegistry.Entry(Cursor.class, EmptyCursor.NAME, in -> EMPTY)); entries.add(new NamedWriteableRegistry.Entry(Cursor.class, ScrollCursor.NAME, ScrollCursor::new)); entries.add(new NamedWriteableRegistry.Entry(Cursor.class, CliFormatterCursor.NAME, CliFormatterCursor::new)); + entries.add(new NamedWriteableRegistry.Entry(Cursor.class, JdbcCursor.NAME, JdbcCursor::new)); return entries; } @@ -57,6 +59,9 @@ public interface Cursor extends NamedWriteable { * Write a {@linkplain Cursor} to a string for serialization across xcontent. */ static String encodeToString(Version version, Cursor info) { + if(info == EMPTY) { + return ""; + } try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { try (OutputStream base64 = Base64.getEncoder().wrap(os); StreamOutput out = new OutputStreamStreamOutput(base64)) { @@ -74,6 +79,9 @@ public interface Cursor extends NamedWriteable { * Read a {@linkplain Cursor} from a string. */ static Cursor decodeFromString(String info) { + if (info.isEmpty()) { + return EMPTY; + } byte[] bytes = info.getBytes(StandardCharsets.UTF_8); try (StreamInput delegate = new InputStreamStreamInput(Base64.getDecoder().wrap(new ByteArrayInputStream(bytes))); StreamInput in = new NamedWriteableAwareStreamInput(delegate, CURSOR_REGISTRY)) { diff --git a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryPageRequest.java b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryPageRequest.java index f939ccb0fd8..cf4ce9c2968 100644 --- a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryPageRequest.java +++ b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryPageRequest.java @@ -6,15 +6,13 @@ package org.elasticsearch.xpack.sql.protocol.shared; import java.io.IOException; -import java.util.Arrays; -import java.util.Locale; import java.util.Objects; public abstract class AbstractQueryPageRequest extends Request { - public final byte[] cursor; + public final String cursor; public final TimeoutInfo timeout; - protected AbstractQueryPageRequest(byte[] cursor, TimeoutInfo timeout) { + protected AbstractQueryPageRequest(String cursor, TimeoutInfo timeout) { if (cursor == null) { throw new IllegalArgumentException("[cursor] must not be null"); } @@ -26,25 +24,19 @@ public abstract class AbstractQueryPageRequest extends Request { } protected AbstractQueryPageRequest(SqlDataInput in) throws IOException { - this.cursor = new byte[ProtoUtil.readArraySize(in)]; - in.readFully(cursor); + this.cursor = in.readUTF(); this.timeout = new TimeoutInfo(in); } @Override public void writeTo(SqlDataOutput out) throws IOException { - out.writeInt(cursor.length); - out.write(cursor); + out.writeUTF(cursor); timeout.writeTo(out); } @Override protected String toStringBody() { - StringBuilder b = new StringBuilder(); - for (int i = 0; i < cursor.length; i++) { - b.append(String.format(Locale.ROOT, "%02x", cursor[i])); - } - return b.toString(); + return cursor; } @Override @@ -53,7 +45,7 @@ public abstract class AbstractQueryPageRequest extends Request { return false; } AbstractQueryPageRequest other = (AbstractQueryPageRequest) obj; - return Arrays.equals(cursor, other.cursor) + return Objects.equals(cursor, other.cursor) && timeout.equals(other.timeout); } diff --git a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryResponse.java b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryResponse.java index 126e8dd4641..f1ba90400dc 100644 --- a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryResponse.java +++ b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryResponse.java @@ -7,8 +7,6 @@ package org.elasticsearch.xpack.sql.protocol.shared; import java.io.DataInput; import java.io.IOException; -import java.util.Arrays; -import java.util.Locale; import java.util.Objects; /** @@ -17,9 +15,9 @@ import java.util.Objects; */ public abstract class AbstractQueryResponse extends Response { private final long tookNanos; - private final byte[] cursor; + private final String cursor; - protected AbstractQueryResponse(long tookNanos, byte[] cursor) { + protected AbstractQueryResponse(long tookNanos, String cursor) { if (cursor == null) { throw new IllegalArgumentException("cursor must not be null"); } @@ -29,15 +27,13 @@ public abstract class AbstractQueryResponse extends Response { protected AbstractQueryResponse(Request request, DataInput in) throws IOException { tookNanos = in.readLong(); - cursor = new byte[ProtoUtil.readArraySize(in)]; - in.readFully(cursor); + cursor = in.readUTF(); } @Override protected void writeTo(SqlDataOutput out) throws IOException { out.writeLong(tookNanos); - out.writeInt(cursor.length); - out.write(cursor); + out.writeUTF(cursor); } /** @@ -52,7 +48,7 @@ public abstract class AbstractQueryResponse extends Response { * Cursor for fetching the next page. If it has {@code length = 0} * then there is no next page. */ - public byte[] cursor() { + public String cursor() { return cursor; } @@ -61,9 +57,7 @@ public abstract class AbstractQueryResponse extends Response { StringBuilder b = new StringBuilder(); b.append("tookNanos=[").append(tookNanos); b.append("] cursor=["); - for (int i = 0; i < cursor.length; i++) { - b.append(String.format(Locale.ROOT, "%02x", cursor[i])); - } + b.append(cursor); b.append("]"); return b.toString(); } @@ -75,11 +69,11 @@ public abstract class AbstractQueryResponse extends Response { } AbstractQueryResponse other = (AbstractQueryResponse) obj; return tookNanos == other.tookNanos - && Arrays.equals(cursor, other.cursor); + && Objects.equals(cursor, other.cursor); } @Override public int hashCode() { - return Objects.hash(tookNanos, Arrays.hashCode(cursor)); + return Objects.hash(tookNanos, cursor); } }