From 3e14c30aa1f7d120d0a52b35ac609c1d18384c63 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 21 Nov 2017 11:24:27 -0500 Subject: [PATCH] SQL: Add version to cursor serialization (elastic/x-pack-elasticsearch#3064) This commits also simplifies the serialization mechanism by remove 2 ways to serialize the cursor. Adding the version there was complicating things too much otherwise. Original commit: elastic/x-pack-elasticsearch@4f2c541e0a0e8d0562430d4e9a372328cfb04731 --- .../sql/execution/search/ScrollCursor.java | 53 +--------------- .../plugin/AbstractSqlProtocolRestAction.java | 17 +---- .../sql/plugin/sql/action/SqlResponse.java | 3 +- .../xpack/sql/session/Cursor.java | 63 ++++++++++--------- .../xpack/sql/session/EmptyCursor.java | 5 -- .../execution/search/ScrollCursorTests.java | 12 +--- .../plugin/sql/action/SqlResponseTests.java | 17 ++++- 7 files changed, 57 insertions(+), 113 deletions(-) diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java index f9b3e3f236b..732f1f86145 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java @@ -10,31 +10,19 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.common.io.stream.InputStreamStreamInput; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; -import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors; import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.RowSet; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Base64; import java.util.List; import java.util.Objects; public class ScrollCursor implements Cursor { public static final String NAME = "s"; - /** - * {@link NamedWriteableRegistry} used to resolve the {@link #extractors}. - */ - private static final NamedWriteableRegistry REGISTRY = new NamedWriteableRegistry(HitExtractors.getNamedWriteables()); private final String scrollId; private final List extractors; @@ -59,45 +47,6 @@ public class ScrollCursor implements Cursor { out.writeVInt(limit); } - public ScrollCursor(java.io.Reader reader) throws IOException { - StringBuffer scrollId = new StringBuffer(); - int c; - while ((c = reader.read()) != -1 && c != ':') { - scrollId.append((char) c); - } - this.scrollId = scrollId.toString(); - if (c == -1) { - throw new IllegalArgumentException("invalid cursor"); - } - try (StreamInput delegate = new InputStreamStreamInput(Base64.getDecoder().wrap(new InputStream() { - @Override - public int read() throws IOException { - int c = reader.read(); - if (c < -1 || c > 0xffff) { - throw new IllegalArgumentException("invalid cursor [" + Integer.toHexString(c) + "]"); - } - return c; - } - })); StreamInput in = new NamedWriteableAwareStreamInput(delegate, REGISTRY)) { - extractors = in.readNamedWriteableList(HitExtractor.class); - limit = in.readVInt(); - } - } - - @Override - public void writeTo(java.io.Writer writer) throws IOException { - writer.write(scrollId); - writer.write(':'); - try (StreamOutput out = new OutputStreamStreamOutput(Base64.getEncoder().wrap(new OutputStream() { - @Override - public void write(int b) throws IOException { - writer.write(b); - } - }))) { - out.writeNamedWriteableList(extractors); - out.writeVInt(limit); - } - } @Override public String getWriteableName() { @@ -132,6 +81,6 @@ public class ScrollCursor implements Cursor { @Override public String toString() { - return "cursor for scoll [" + scrollId + "]"; + return "cursor for scroll [" + scrollId + "]"; } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/AbstractSqlProtocolRestAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/AbstractSqlProtocolRestAction.java index 85f7f04a14a..b763fe7f1dd 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/AbstractSqlProtocolRestAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/AbstractSqlProtocolRestAction.java @@ -5,11 +5,9 @@ */ package org.elasticsearch.xpack.sql.plugin; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; @@ -17,19 +15,11 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestRequest.Method; +import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.RestResponseListener; -import org.elasticsearch.xpack.sql.ClientSqlException; -import org.elasticsearch.xpack.sql.analysis.AnalysisException; -import org.elasticsearch.xpack.sql.analysis.catalog.MappingException; import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto; -import org.elasticsearch.xpack.sql.parser.ParsingException; -import org.elasticsearch.xpack.sql.planner.PlanningException; -import org.elasticsearch.xpack.sql.protocol.shared.AbstractErrorResponse; -import org.elasticsearch.xpack.sql.protocol.shared.AbstractExceptionResponse; import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto; -import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType; import org.elasticsearch.xpack.sql.protocol.shared.Request; import org.elasticsearch.xpack.sql.protocol.shared.Response; import org.elasticsearch.xpack.sql.session.Cursor; @@ -37,9 +27,6 @@ import org.elasticsearch.xpack.sql.session.Cursor; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.concurrent.TimeoutException; import java.util.function.Function; import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE; @@ -47,7 +34,7 @@ import static org.elasticsearch.rest.RestStatus.OK; import static org.elasticsearch.xpack.sql.util.StringUtils.EMPTY; public abstract class AbstractSqlProtocolRestAction extends BaseRestHandler { - protected static final NamedWriteableRegistry CURSOR_REGISTRY = new NamedWriteableRegistry(Cursor.getNamedWriteables()); + public static final NamedWriteableRegistry CURSOR_REGISTRY = new NamedWriteableRegistry(Cursor.getNamedWriteables()); private final AbstractProto proto; protected AbstractSqlProtocolRestAction(Settings settings, AbstractProto proto) { diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlResponse.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlResponse.java index c28b43c13be..0880e7b3e9e 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlResponse.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlResponse.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.sql.plugin.sql.action; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -138,7 +139,7 @@ public class SqlResponse extends ActionResponse implements ToXContentObject { builder.endArray(); if (cursor != Cursor.EMPTY) { - builder.field(SqlRequest.CURSOR.getPreferredName(), Cursor.encodeToString(cursor)); + builder.field(SqlRequest.CURSOR.getPreferredName(), Cursor.encodeToString(Version.CURRENT, cursor)); } } return builder.endObject(); diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java index 052759e6327..87fab5fad14 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java @@ -5,19 +5,30 @@ */ package org.elasticsearch.xpack.sql.session; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.common.io.FastStringReader; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.sql.execution.search.ScrollCursor; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.StringWriter; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Base64; import java.util.List; +import static org.elasticsearch.xpack.sql.plugin.AbstractSqlProtocolRestAction.CURSOR_REGISTRY; + /** * Information required to access the next page of response. */ @@ -28,10 +39,6 @@ public interface Cursor extends NamedWriteable { * Request the next page of data. */ void nextPage(Configuration cfg, Client client, ActionListener listener); - /** - * Write the {@linkplain Cursor} to a String for serialization over xcontent. - */ - void writeTo(java.io.Writer writer) throws IOException; /** * The {@link NamedWriteable}s required to deserialize {@link Cursor}s. @@ -47,37 +54,35 @@ public interface Cursor extends NamedWriteable { /** * Write a {@linkplain Cursor} to a string for serialization across xcontent. */ - static String encodeToString(Cursor info) { - StringWriter writer = new StringWriter(); - try { - writer.write(info.getWriteableName()); - info.writeTo(writer); - } catch (IOException e) { - throw new RuntimeException("unexpected failure converting next page info to a string", e); + static String encodeToString(Version version, Cursor info) { + try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { + try (OutputStream base64 = Base64.getEncoder().wrap(os); + StreamOutput out = new OutputStreamStreamOutput(base64)) { + Version.writeVersion(version, out); + out.writeNamedWriteable(info); + } + return os.toString(StandardCharsets.UTF_8.name()); + } catch (IOException ex) { + throw new RuntimeException("unexpected failure converting next page info to a string", ex); } - return writer.toString(); } + /** * Read a {@linkplain Cursor} from a string. */ static Cursor decodeFromString(String info) { - // TODO version compatibility - /* We need to encode minimum version across the cluster and use that - * to handle changes to this protocol across versions. */ - String name = info.substring(0, 1); - try (java.io.Reader reader = new FastStringReader(info)) { - reader.skip(1); - switch (name) { - case EmptyCursor.NAME: - throw new RuntimeException("empty cursor shouldn't be encoded to a string"); - case ScrollCursor.NAME: - return new ScrollCursor(reader); - default: - throw new RuntimeException("unknown cursor type [" + name + "]"); + byte[] bytes = info.getBytes(StandardCharsets.UTF_8); + try (StreamInput delegate = new InputStreamStreamInput(Base64.getDecoder().wrap(new ByteArrayInputStream(bytes))); + StreamInput in = new NamedWriteableAwareStreamInput(delegate, CURSOR_REGISTRY)) { + Version version = Version.readVersion(in); + if (version.after(Version.CURRENT)) { + throw new RuntimeException("Unsupported scroll version " + version); } - } catch (IOException e) { - throw new RuntimeException("unexpected failure deconding cursor", e); + in.setVersion(version); + return in.readNamedWriteable(Cursor.class); + } catch (IOException ex) { + throw new RuntimeException("unexpected failure deconding cursor", ex); } } } \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java index 10db79f42f6..9da75b3b6b0 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java @@ -29,11 +29,6 @@ class EmptyCursor implements Cursor { return NAME; } - @Override - public void writeTo(java.io.Writer writer) throws IOException { - throw new IOException("no next page should not be converted to or from a string"); - } - @Override public void nextPage(Configuration cfg, Client client, ActionListener listener) { throw new IllegalArgumentException("there is no next page"); diff --git a/sql/server/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java b/sql/server/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java index bb819fc6d34..4982e9fa953 100644 --- a/sql/server/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java +++ b/sql/server/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.sql.execution.search; import org.elasticsearch.Version; -import org.elasticsearch.common.io.FastStringReader; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.test.AbstractWireSerializingTestCase; @@ -17,9 +16,9 @@ import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors; import org.elasticsearch.xpack.sql.execution.search.extractor.InnerHitExtractorTests; import org.elasticsearch.xpack.sql.execution.search.extractor.ProcessingHitExtractorTests; import org.elasticsearch.xpack.sql.execution.search.extractor.SourceExtractorTests; +import org.elasticsearch.xpack.sql.session.Cursor; import java.io.IOException; -import java.io.StringWriter; import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; @@ -68,13 +67,6 @@ public class ScrollCursorTests extends AbstractWireSerializingTestCase { } if (testInstance.cursor() != Cursor.EMPTY) { - assertEquals(rootMap.get(SqlRequest.CURSOR.getPreferredName()), Cursor.encodeToString(testInstance.cursor())); + assertEquals(rootMap.get(SqlRequest.CURSOR.getPreferredName()), Cursor.encodeToString(Version.CURRENT, testInstance.cursor())); } } + + public void testVersionHandling() { + Cursor cursor = randomCursor(); + assertEquals(cursor, Cursor.decodeFromString(Cursor.encodeToString(Version.CURRENT, cursor))); + + Version nextMinorVersion = Version.fromId(Version.CURRENT.id + 10000); + + String encodedWithWrongVersion = Cursor.encodeToString(nextMinorVersion, cursor); + RuntimeException exception = expectThrows(RuntimeException.class, () -> { + Cursor.decodeFromString(encodedWithWrongVersion); + }); + + assertEquals(exception.getMessage(), "Unsupported scroll version " + nextMinorVersion); + } }