SQL: Switch to the standard way of cursor serialization (elastic/x-pack-elasticsearch#3197)
While working on cursor cleanup, I realized that we still have two ways to serialize the cursor and the second way doesn't contain the cursor version (only client version, that can be potentially different from the cursor version). This commit switches to the unified way of serializing the cursor. This is a follow up for elastic/x-pack-elasticsearch#3064. Original commit: elastic/x-pack-elasticsearch@ef1a6427dd
This commit is contained in:
parent
5c6e5c1cb5
commit
773cdf0f9f
|
@ -13,7 +13,7 @@ import java.io.DataInput;
|
|||
import java.io.IOException;
|
||||
|
||||
public class QueryInitResponse extends QueryResponse {
|
||||
public QueryInitResponse(long tookNanos, byte[] cursor, String data) {
|
||||
public QueryInitResponse(long tookNanos, String cursor, String data) {
|
||||
super(tookNanos, cursor, data);
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
|
|||
import java.io.IOException;
|
||||
|
||||
public class QueryPageRequest extends AbstractQueryPageRequest {
|
||||
public QueryPageRequest(byte[] cursor, TimeoutInfo timeout) {
|
||||
public QueryPageRequest(String cursor, TimeoutInfo timeout) {
|
||||
super(cursor, timeout);
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ import java.io.DataInput;
|
|||
import java.io.IOException;
|
||||
|
||||
public class QueryPageResponse extends QueryResponse {
|
||||
public QueryPageResponse(long tookNanos, byte[] cursor, String data) {
|
||||
public QueryPageResponse(long tookNanos, String cursor, String data) {
|
||||
super(tookNanos, cursor, data);
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ import java.util.Objects;
|
|||
public abstract class QueryResponse extends AbstractQueryResponse {
|
||||
public final String data;
|
||||
|
||||
protected QueryResponse(long tookNanos, byte[] cursor, String data) {
|
||||
protected QueryResponse(long tookNanos, String cursor, String data) {
|
||||
super(tookNanos, cursor);
|
||||
if (data == null) {
|
||||
throw new IllegalArgumentException("data cannot be null");
|
||||
|
|
|
@ -14,8 +14,7 @@ import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequestTests
|
|||
|
||||
public class QueryInitResponseTests extends ESTestCase {
|
||||
static QueryInitResponse randomQueryInitResponse() {
|
||||
byte[] cursor = new byte[between(0, 5)];
|
||||
random().nextBytes(cursor);
|
||||
String cursor = randomAlphaOfLength(10);
|
||||
return new QueryInitResponse(randomNonNegativeLong(), cursor, randomAlphaOfLength(5));
|
||||
}
|
||||
|
||||
|
@ -25,6 +24,6 @@ public class QueryInitResponseTests extends ESTestCase {
|
|||
|
||||
public void testToString() {
|
||||
assertEquals("QueryInitResponse<tookNanos=[123] cursor=[0103] data=[test]>",
|
||||
new QueryInitResponse(123, new byte[] {0x01, 0x03}, "test").toString());
|
||||
new QueryInitResponse(123, "0103", "test").toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,8 +15,7 @@ import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils
|
|||
|
||||
public class QueryPageRequestTests extends ESTestCase {
|
||||
static QueryPageRequest randomQueryPageRequest() {
|
||||
byte[] cursor = new byte[between(0, 5)];
|
||||
random().nextBytes(cursor);
|
||||
String cursor = randomAlphaOfLength(10);
|
||||
return new QueryPageRequest(cursor, randomTimeoutInfo());
|
||||
}
|
||||
|
||||
|
@ -25,6 +24,6 @@ public class QueryPageRequestTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testToString() {
|
||||
assertEquals("QueryPageRequest<0320>", new QueryPageRequest(new byte[] {0x03, 0x20}, new TimeoutInfo(1, 1, 1)).toString());
|
||||
assertEquals("QueryPageRequest<0320>", new QueryPageRequest("0320", new TimeoutInfo(1, 1, 1)).toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,8 +14,7 @@ import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequestTests
|
|||
|
||||
public class QueryPageResponseTests extends ESTestCase {
|
||||
static QueryPageResponse randomQueryPageResponse() {
|
||||
byte[] cursor = new byte[between(0, 5)];
|
||||
random().nextBytes(cursor);
|
||||
String cursor = randomAlphaOfLength(10);
|
||||
return new QueryPageResponse(randomNonNegativeLong(), cursor, randomAlphaOfLength(5));
|
||||
}
|
||||
|
||||
|
@ -25,6 +24,6 @@ public class QueryPageResponseTests extends ESTestCase {
|
|||
|
||||
public void testToString() {
|
||||
assertEquals("QueryPageResponse<tookNanos=[123] cursor=[0103] data=[test]>",
|
||||
new QueryPageResponse(123, new byte[] {0x01, 0x03}, "test").toString());
|
||||
new QueryPageResponse(123, "0103", "test").toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -309,7 +309,7 @@ public class Cli {
|
|||
while (true) {
|
||||
term.writer().print(ResponseToString.toAnsi(response).toAnsi(term));
|
||||
term.writer().flush();
|
||||
if (response.cursor().length == 0) {
|
||||
if (response.cursor().isEmpty()) {
|
||||
// Successfully finished the entire query!
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ public class CliHttpClient {
|
|||
return (QueryResponse) post(request);
|
||||
}
|
||||
|
||||
public QueryResponse nextPage(byte[] cursor) throws SQLException {
|
||||
public QueryResponse nextPage(String cursor) throws SQLException {
|
||||
QueryPageRequest request = new QueryPageRequest(cursor, timeout());
|
||||
return (QueryResponse) post(request);
|
||||
}
|
||||
|
|
|
@ -16,13 +16,13 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
public class ResponseToStringTests extends ESTestCase {
|
||||
public void testQueryInitResponse() {
|
||||
AttributedStringBuilder s = ResponseToString.toAnsi(new QueryInitResponse(123, new byte[0], "some command response"));
|
||||
AttributedStringBuilder s = ResponseToString.toAnsi(new QueryInitResponse(123, "", "some command response"));
|
||||
assertEquals("some command response", unstyled(s));
|
||||
assertEquals("[37msome command response[0m", fullyStyled(s));
|
||||
}
|
||||
|
||||
public void testQueryPageResponse() {
|
||||
AttributedStringBuilder s = ResponseToString.toAnsi(new QueryPageResponse(123, new byte[0], "some command response"));
|
||||
AttributedStringBuilder s = ResponseToString.toAnsi(new QueryPageResponse(123, "", "some command response"));
|
||||
assertEquals("some command response", unstyled(s));
|
||||
assertEquals("[37msome command response[0m", fullyStyled(s));
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ public class QueryInitResponse extends AbstractQueryResponse {
|
|||
public final List<ColumnInfo> columns;
|
||||
public final Payload data;
|
||||
|
||||
public QueryInitResponse(long tookNanos, byte[] cursor, List<ColumnInfo> columns, Payload data) {
|
||||
public QueryInitResponse(long tookNanos, String cursor, List<ColumnInfo> columns, Payload data) {
|
||||
super(tookNanos, cursor);
|
||||
this.columns = columns;
|
||||
this.data = data;
|
||||
|
|
|
@ -16,7 +16,7 @@ import java.io.IOException;
|
|||
public class QueryPageRequest extends AbstractQueryPageRequest {
|
||||
private final transient Payload data;
|
||||
|
||||
public QueryPageRequest(byte[] cursor, TimeoutInfo timeout, @Nullable Payload data) {
|
||||
public QueryPageRequest(String cursor, TimeoutInfo timeout, @Nullable Payload data) {
|
||||
super(cursor, timeout);
|
||||
this.data = data;
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ import java.util.Objects;
|
|||
public class QueryPageResponse extends AbstractQueryResponse {
|
||||
private final Payload data;
|
||||
|
||||
public QueryPageResponse(long tookNanos, byte[] cursor, Payload data) {
|
||||
public QueryPageResponse(long tookNanos, String cursor, Payload data) {
|
||||
super(tookNanos, cursor);
|
||||
this.data = data;
|
||||
}
|
||||
|
|
|
@ -16,8 +16,7 @@ import static org.elasticsearch.xpack.sql.jdbc.net.protocol.PageTests.randomPage
|
|||
|
||||
public class QueryInitResponseTests extends ESTestCase {
|
||||
static QueryInitResponse randomQueryInitResponse() {
|
||||
byte[] cursor = new byte[between(0, 5)];
|
||||
random().nextBytes(cursor);
|
||||
String cursor = randomAlphaOfLength(10);
|
||||
Page page = randomPage();
|
||||
return new QueryInitResponse(randomNonNegativeLong(), cursor, page.columnInfo(), page);
|
||||
}
|
||||
|
@ -33,6 +32,6 @@ public class QueryInitResponseTests extends ESTestCase {
|
|||
});
|
||||
assertEquals("QueryInitResponse<tookNanos=[123] cursor=[0120] columns=[a<type=[VARCHAR]>] data=["
|
||||
+ "\ntest\nstring\n]>",
|
||||
new QueryInitResponse(123, new byte[] {0x01, 0x20}, page.columnInfo(), page).toString());
|
||||
new QueryInitResponse(123, "0120", page.columnInfo(), page).toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,8 +15,7 @@ import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUti
|
|||
|
||||
public class QueryPageRequestTests extends ESTestCase {
|
||||
static QueryPageRequest randomQueryPageRequest(Page page) {
|
||||
byte[] cursor = new byte[between(0, 5)];
|
||||
random().nextBytes(cursor);
|
||||
String cursor = randomAlphaOfLength(10);
|
||||
return new QueryPageRequest(cursor, randomTimeoutInfo(), page);
|
||||
}
|
||||
|
||||
|
@ -25,6 +24,6 @@ public class QueryPageRequestTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testToString() {
|
||||
assertEquals("QueryPageRequest<0320>", new QueryPageRequest(new byte[] {0x03, 0x20}, new TimeoutInfo(1, 1, 1), null).toString());
|
||||
assertEquals("QueryPageRequest<0320>", new QueryPageRequest("0320", new TimeoutInfo(1, 1, 1), null).toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,8 +17,7 @@ import static org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequestTest
|
|||
|
||||
public class QueryPageResponseTests extends ESTestCase {
|
||||
static QueryPageResponse randomQueryPageResponse(Page page) {
|
||||
byte[] cursor = new byte[between(0, 5)];
|
||||
random().nextBytes(cursor);
|
||||
String cursor = randomAlphaOfLength(10);
|
||||
return new QueryPageResponse(randomNonNegativeLong(), cursor, page);
|
||||
}
|
||||
|
||||
|
@ -32,6 +31,6 @@ public class QueryPageResponseTests extends ESTestCase {
|
|||
new Object[] {"test"}
|
||||
});
|
||||
assertEquals("QueryPageResponse<tookNanos=[123] cursor=[0810] data=[\ntest\n]>",
|
||||
new QueryPageResponse(123, new byte[] {0x08, 0x10}, results).toString());
|
||||
new QueryPageResponse(123, "0810", results).toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,9 +18,9 @@ class DefaultCursor implements Cursor {
|
|||
|
||||
private final Page page;
|
||||
private int row = -1;
|
||||
private byte[] cursor;
|
||||
private String cursor;
|
||||
|
||||
DefaultCursor(JdbcHttpClient client, byte[] cursor, Page page, RequestMeta meta) {
|
||||
DefaultCursor(JdbcHttpClient client, String cursor, Page page, RequestMeta meta) {
|
||||
this.client = client;
|
||||
this.meta = meta;
|
||||
this.cursor = cursor;
|
||||
|
@ -39,7 +39,7 @@ class DefaultCursor implements Cursor {
|
|||
return true;
|
||||
}
|
||||
else {
|
||||
if (cursor.length != 0) {
|
||||
if (cursor.isEmpty() == false) {
|
||||
cursor = client.nextPage(cursor, page, meta);
|
||||
row = -1;
|
||||
return next();
|
||||
|
|
|
@ -64,7 +64,7 @@ public class JdbcHttpClient {
|
|||
* Read the next page of results, updating the {@link Page} and returning
|
||||
* the scroll id to use to fetch the next page.
|
||||
*/
|
||||
public byte[] nextPage(byte[] cursor, Page page, RequestMeta meta) throws SQLException {
|
||||
public String nextPage(String cursor, Page page, RequestMeta meta) throws SQLException {
|
||||
QueryPageRequest request = new QueryPageRequest(cursor, timeout(meta), page);
|
||||
return ((QueryPageResponse) http.post(request)).cursor();
|
||||
}
|
||||
|
|
|
@ -24,6 +24,13 @@ public class CliFormatterCursor implements Cursor {
|
|||
private Cursor delegate;
|
||||
private CliFormatter formatter;
|
||||
|
||||
public static Cursor wrap(Cursor newCursor, CliFormatter formatter) {
|
||||
if (newCursor == EMPTY) {
|
||||
return EMPTY;
|
||||
}
|
||||
return new CliFormatterCursor(newCursor, formatter);
|
||||
}
|
||||
|
||||
public CliFormatterCursor(Cursor delegate, CliFormatter formatter) {
|
||||
this.delegate = delegate;
|
||||
this.formatter = formatter;
|
||||
|
@ -39,6 +46,7 @@ public class CliFormatterCursor implements Cursor {
|
|||
out.writeNamedWriteable(delegate);
|
||||
formatter.writeTo(out);
|
||||
}
|
||||
|
||||
public CliFormatter getCliFormatter() {
|
||||
return formatter;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.plugin;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.xpack.sql.session.Configuration;
|
||||
import org.elasticsearch.xpack.sql.session.Cursor;
|
||||
import org.elasticsearch.xpack.sql.session.RowSet;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.JDBCType;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The cursor that wraps all necessary information for jdbc
|
||||
*/
|
||||
public class JdbcCursor implements Cursor {
|
||||
public static final String NAME = "j";
|
||||
private Cursor delegate;
|
||||
private List<JDBCType> types;
|
||||
|
||||
|
||||
public static Cursor wrap(Cursor newCursor, List<JDBCType> types) {
|
||||
if (newCursor == EMPTY) {
|
||||
return EMPTY;
|
||||
}
|
||||
return new JdbcCursor(newCursor, types);
|
||||
}
|
||||
|
||||
public JdbcCursor(Cursor delegate, List<JDBCType> types) {
|
||||
this.delegate = delegate;
|
||||
this.types = types;
|
||||
}
|
||||
|
||||
public JdbcCursor(StreamInput in) throws IOException {
|
||||
delegate = in.readNamedWriteable(Cursor.class);
|
||||
int size = in.readVInt();
|
||||
types = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
types.add(JDBCType.valueOf(in.readVInt()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeNamedWriteable(delegate);
|
||||
out.writeVInt(types.size());
|
||||
for (JDBCType type : types) {
|
||||
out.writeVInt(type.getVendorTypeNumber());
|
||||
}
|
||||
}
|
||||
|
||||
public List<JDBCType> getTypes() {
|
||||
return types;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nextPage(Configuration cfg, Client client, ActionListener<RowSet> listener) {
|
||||
delegate.nextPage(cfg, client, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
}
|
|
@ -5,14 +5,10 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.plugin;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.main.MainAction;
|
||||
import org.elasticsearch.action.main.MainRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
|
@ -83,19 +79,17 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction {
|
|||
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> {
|
||||
CliFormatter formatter = new CliFormatter(response);
|
||||
String data = formatter.formatWithHeader(response);
|
||||
return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data);
|
||||
return new QueryInitResponse(System.nanoTime() - start,
|
||||
Cursor.encodeToString(Version.CURRENT, CliFormatterCursor.wrap(response.cursor(), formatter)), data);
|
||||
}));
|
||||
}
|
||||
|
||||
private Consumer<RestChannel> queryPage(Client client, QueryPageRequest request) {
|
||||
Cursor cursor;
|
||||
CliFormatter formatter;
|
||||
try (StreamInput in = new NamedWriteableAwareStreamInput(new BytesArray(request.cursor).streamInput(), CURSOR_REGISTRY)) {
|
||||
cursor = in.readNamedWriteable(Cursor.class);
|
||||
formatter = new CliFormatter(in);
|
||||
} catch (IOException e) {
|
||||
throw new IllegalArgumentException("error reading the cursor");
|
||||
Cursor cursor = Cursor.decodeFromString(request.cursor);
|
||||
if (cursor instanceof CliFormatterCursor == false) {
|
||||
throw new IllegalArgumentException("Unexpected cursor type: [" + cursor + "]");
|
||||
}
|
||||
CliFormatter formatter = ((CliFormatterCursor)cursor).getCliFormatter();
|
||||
SqlRequest sqlRequest = new SqlRequest("", null, SqlRequest.DEFAULT_TIME_ZONE, 0,
|
||||
TimeValue.timeValueMillis(request.timeout.requestTimeout),
|
||||
TimeValue.timeValueMillis(request.timeout.pageTimeout),
|
||||
|
@ -104,20 +98,8 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction {
|
|||
long start = System.nanoTime();
|
||||
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> {
|
||||
String data = formatter.formatWithoutHeader(response);
|
||||
return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data);
|
||||
return new QueryPageResponse(System.nanoTime() - start,
|
||||
Cursor.encodeToString(Version.CURRENT, CliFormatterCursor.wrap(response.cursor(), formatter)), data);
|
||||
}));
|
||||
}
|
||||
|
||||
private static byte[] serializeCursor(Cursor cursor, CliFormatter formatter) {
|
||||
if (cursor == Cursor.EMPTY) {
|
||||
return new byte[0];
|
||||
}
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
out.writeNamedWriteable(cursor);
|
||||
formatter.writeTo(out);
|
||||
return BytesRef.deepCopyOf(out.bytes().toBytesRef()).bytes;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("unexpected trouble building the cursor", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,16 +5,12 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.plugin;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.main.MainAction;
|
||||
import org.elasticsearch.action.main.MainRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
|
@ -163,7 +159,6 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
|
|||
}
|
||||
|
||||
private Consumer<RestChannel> queryInit(Client client, QueryInitRequest request) {
|
||||
|
||||
SqlRequest sqlRequest = new SqlRequest(request.query, null, DateTimeZone.forTimeZone(request.timeZone), request.fetchSize,
|
||||
TimeValue.timeValueMillis(request.timeout.requestTimeout),
|
||||
TimeValue.timeValueMillis(request.timeout.pageTimeout),
|
||||
|
@ -176,20 +171,18 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
|
|||
types.add(info.jdbcType());
|
||||
columns.add(new ColumnInfo(info.name(), info.jdbcType(), EMPTY, EMPTY, EMPTY, EMPTY, info.displaySize()));
|
||||
}
|
||||
return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types), columns,
|
||||
return new QueryInitResponse(System.nanoTime() - start,
|
||||
Cursor.encodeToString(Version.CURRENT, JdbcCursor.wrap(response.cursor(), types)), columns,
|
||||
new SqlResponsePayload(types, response.rows()));
|
||||
}));
|
||||
}
|
||||
|
||||
private Consumer<RestChannel> queryPage(Client client, QueryPageRequest request) {
|
||||
Cursor cursor;
|
||||
List<JDBCType> types;
|
||||
try (StreamInput in = new NamedWriteableAwareStreamInput(new BytesArray(request.cursor).streamInput(), CURSOR_REGISTRY)) {
|
||||
cursor = in.readNamedWriteable(Cursor.class);
|
||||
types = in.readList(r -> JDBCType.valueOf(r.readVInt()));
|
||||
} catch (IOException e) {
|
||||
throw new IllegalArgumentException("error reading the cursor");
|
||||
Cursor cursor = Cursor.decodeFromString(request.cursor);
|
||||
if (cursor instanceof JdbcCursor == false) {
|
||||
throw new IllegalArgumentException("Unexpected cursor type: [" + cursor + "]");
|
||||
}
|
||||
List<JDBCType> types = ((JdbcCursor)cursor).getTypes();
|
||||
// NB: the timezone and page size are locked already by the query so pass in defaults (as they are not read anyway)
|
||||
SqlRequest sqlRequest = new SqlRequest(EMPTY, null, SqlRequest.DEFAULT_TIME_ZONE, 0,
|
||||
TimeValue.timeValueMillis(request.timeout.requestTimeout),
|
||||
|
@ -197,23 +190,8 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
|
|||
cursor);
|
||||
long start = System.nanoTime();
|
||||
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel,
|
||||
response -> new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types),
|
||||
response -> new QueryPageResponse(System.nanoTime() - start,
|
||||
Cursor.encodeToString(Version.CURRENT, JdbcCursor.wrap(response.cursor(), types)),
|
||||
new SqlResponsePayload(types, response.rows()))));
|
||||
}
|
||||
|
||||
private static byte[] serializeCursor(Cursor cursor, List<JDBCType> types) {
|
||||
if (cursor == Cursor.EMPTY) {
|
||||
return new byte[0];
|
||||
}
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
out.writeNamedWriteable(cursor);
|
||||
out.writeVInt(types.size());
|
||||
for (JDBCType type : types) {
|
||||
out.writeVInt(type.getVendorTypeNumber());
|
||||
}
|
||||
return BytesRef.deepCopyOf(out.bytes().toBytesRef()).bytes;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("unexpected trouble building the cursor", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,13 +66,7 @@ public class RestSqlAction extends BaseRestHandler {
|
|||
data = formatter.formatWithHeader(response);
|
||||
}
|
||||
|
||||
final Cursor responseCursor;
|
||||
if (response.cursor() == Cursor.EMPTY) {
|
||||
responseCursor = Cursor.EMPTY;
|
||||
} else {
|
||||
responseCursor = new CliFormatterCursor(response.cursor(), formatter);
|
||||
}
|
||||
return buildTextResponse(responseCursor, System.nanoTime() - startNanos, data);
|
||||
return buildTextResponse(CliFormatterCursor.wrap(response.cursor(), formatter), System.nanoTime() - startNanos, data);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.xpack.sql.execution.search.ScrollCursor;
|
||||
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors;
|
||||
import org.elasticsearch.xpack.sql.plugin.CliFormatterCursor;
|
||||
import org.elasticsearch.xpack.sql.plugin.JdbcCursor;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
@ -50,6 +51,7 @@ public interface Cursor extends NamedWriteable {
|
|||
entries.add(new NamedWriteableRegistry.Entry(Cursor.class, EmptyCursor.NAME, in -> EMPTY));
|
||||
entries.add(new NamedWriteableRegistry.Entry(Cursor.class, ScrollCursor.NAME, ScrollCursor::new));
|
||||
entries.add(new NamedWriteableRegistry.Entry(Cursor.class, CliFormatterCursor.NAME, CliFormatterCursor::new));
|
||||
entries.add(new NamedWriteableRegistry.Entry(Cursor.class, JdbcCursor.NAME, JdbcCursor::new));
|
||||
return entries;
|
||||
}
|
||||
|
||||
|
@ -57,6 +59,9 @@ public interface Cursor extends NamedWriteable {
|
|||
* Write a {@linkplain Cursor} to a string for serialization across xcontent.
|
||||
*/
|
||||
static String encodeToString(Version version, Cursor info) {
|
||||
if(info == EMPTY) {
|
||||
return "";
|
||||
}
|
||||
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
|
||||
try (OutputStream base64 = Base64.getEncoder().wrap(os);
|
||||
StreamOutput out = new OutputStreamStreamOutput(base64)) {
|
||||
|
@ -74,6 +79,9 @@ public interface Cursor extends NamedWriteable {
|
|||
* Read a {@linkplain Cursor} from a string.
|
||||
*/
|
||||
static Cursor decodeFromString(String info) {
|
||||
if (info.isEmpty()) {
|
||||
return EMPTY;
|
||||
}
|
||||
byte[] bytes = info.getBytes(StandardCharsets.UTF_8);
|
||||
try (StreamInput delegate = new InputStreamStreamInput(Base64.getDecoder().wrap(new ByteArrayInputStream(bytes)));
|
||||
StreamInput in = new NamedWriteableAwareStreamInput(delegate, CURSOR_REGISTRY)) {
|
||||
|
|
|
@ -6,15 +6,13 @@
|
|||
package org.elasticsearch.xpack.sql.protocol.shared;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class AbstractQueryPageRequest extends Request {
|
||||
public final byte[] cursor;
|
||||
public final String cursor;
|
||||
public final TimeoutInfo timeout;
|
||||
|
||||
protected AbstractQueryPageRequest(byte[] cursor, TimeoutInfo timeout) {
|
||||
protected AbstractQueryPageRequest(String cursor, TimeoutInfo timeout) {
|
||||
if (cursor == null) {
|
||||
throw new IllegalArgumentException("[cursor] must not be null");
|
||||
}
|
||||
|
@ -26,25 +24,19 @@ public abstract class AbstractQueryPageRequest extends Request {
|
|||
}
|
||||
|
||||
protected AbstractQueryPageRequest(SqlDataInput in) throws IOException {
|
||||
this.cursor = new byte[ProtoUtil.readArraySize(in)];
|
||||
in.readFully(cursor);
|
||||
this.cursor = in.readUTF();
|
||||
this.timeout = new TimeoutInfo(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(SqlDataOutput out) throws IOException {
|
||||
out.writeInt(cursor.length);
|
||||
out.write(cursor);
|
||||
out.writeUTF(cursor);
|
||||
timeout.writeTo(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String toStringBody() {
|
||||
StringBuilder b = new StringBuilder();
|
||||
for (int i = 0; i < cursor.length; i++) {
|
||||
b.append(String.format(Locale.ROOT, "%02x", cursor[i]));
|
||||
}
|
||||
return b.toString();
|
||||
return cursor;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -53,7 +45,7 @@ public abstract class AbstractQueryPageRequest extends Request {
|
|||
return false;
|
||||
}
|
||||
AbstractQueryPageRequest other = (AbstractQueryPageRequest) obj;
|
||||
return Arrays.equals(cursor, other.cursor)
|
||||
return Objects.equals(cursor, other.cursor)
|
||||
&& timeout.equals(other.timeout);
|
||||
}
|
||||
|
||||
|
|
|
@ -7,8 +7,6 @@ package org.elasticsearch.xpack.sql.protocol.shared;
|
|||
|
||||
import java.io.DataInput;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
|
@ -17,9 +15,9 @@ import java.util.Objects;
|
|||
*/
|
||||
public abstract class AbstractQueryResponse extends Response {
|
||||
private final long tookNanos;
|
||||
private final byte[] cursor;
|
||||
private final String cursor;
|
||||
|
||||
protected AbstractQueryResponse(long tookNanos, byte[] cursor) {
|
||||
protected AbstractQueryResponse(long tookNanos, String cursor) {
|
||||
if (cursor == null) {
|
||||
throw new IllegalArgumentException("cursor must not be null");
|
||||
}
|
||||
|
@ -29,15 +27,13 @@ public abstract class AbstractQueryResponse extends Response {
|
|||
|
||||
protected AbstractQueryResponse(Request request, DataInput in) throws IOException {
|
||||
tookNanos = in.readLong();
|
||||
cursor = new byte[ProtoUtil.readArraySize(in)];
|
||||
in.readFully(cursor);
|
||||
cursor = in.readUTF();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeTo(SqlDataOutput out) throws IOException {
|
||||
out.writeLong(tookNanos);
|
||||
out.writeInt(cursor.length);
|
||||
out.write(cursor);
|
||||
out.writeUTF(cursor);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -52,7 +48,7 @@ public abstract class AbstractQueryResponse extends Response {
|
|||
* Cursor for fetching the next page. If it has {@code length = 0}
|
||||
* then there is no next page.
|
||||
*/
|
||||
public byte[] cursor() {
|
||||
public String cursor() {
|
||||
return cursor;
|
||||
}
|
||||
|
||||
|
@ -61,9 +57,7 @@ public abstract class AbstractQueryResponse extends Response {
|
|||
StringBuilder b = new StringBuilder();
|
||||
b.append("tookNanos=[").append(tookNanos);
|
||||
b.append("] cursor=[");
|
||||
for (int i = 0; i < cursor.length; i++) {
|
||||
b.append(String.format(Locale.ROOT, "%02x", cursor[i]));
|
||||
}
|
||||
b.append(cursor);
|
||||
b.append("]");
|
||||
return b.toString();
|
||||
}
|
||||
|
@ -75,11 +69,11 @@ public abstract class AbstractQueryResponse extends Response {
|
|||
}
|
||||
AbstractQueryResponse other = (AbstractQueryResponse) obj;
|
||||
return tookNanos == other.tookNanos
|
||||
&& Arrays.equals(cursor, other.cursor);
|
||||
&& Objects.equals(cursor, other.cursor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(tookNanos, Arrays.hashCode(cursor));
|
||||
return Objects.hash(tookNanos, cursor);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue