SQL: Add version to cursor serialization (elastic/x-pack-elasticsearch#3064)

This commits also simplifies the serialization mechanism by remove 2 ways to serialize the cursor. Adding the version there was complicating things too much otherwise.

Original commit: elastic/x-pack-elasticsearch@4f2c541e0a
This commit is contained in:
Igor Motov 2017-11-21 11:24:27 -05:00 committed by GitHub
parent 0d4a91af50
commit 3e14c30aa1
7 changed files with 57 additions and 113 deletions

View File

@ -10,31 +10,19 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Base64;
import java.util.List;
import java.util.Objects;
public class ScrollCursor implements Cursor {
public static final String NAME = "s";
/**
* {@link NamedWriteableRegistry} used to resolve the {@link #extractors}.
*/
private static final NamedWriteableRegistry REGISTRY = new NamedWriteableRegistry(HitExtractors.getNamedWriteables());
private final String scrollId;
private final List<HitExtractor> extractors;
@ -59,45 +47,6 @@ public class ScrollCursor implements Cursor {
out.writeVInt(limit);
}
public ScrollCursor(java.io.Reader reader) throws IOException {
StringBuffer scrollId = new StringBuffer();
int c;
while ((c = reader.read()) != -1 && c != ':') {
scrollId.append((char) c);
}
this.scrollId = scrollId.toString();
if (c == -1) {
throw new IllegalArgumentException("invalid cursor");
}
try (StreamInput delegate = new InputStreamStreamInput(Base64.getDecoder().wrap(new InputStream() {
@Override
public int read() throws IOException {
int c = reader.read();
if (c < -1 || c > 0xffff) {
throw new IllegalArgumentException("invalid cursor [" + Integer.toHexString(c) + "]");
}
return c;
}
})); StreamInput in = new NamedWriteableAwareStreamInput(delegate, REGISTRY)) {
extractors = in.readNamedWriteableList(HitExtractor.class);
limit = in.readVInt();
}
}
@Override
public void writeTo(java.io.Writer writer) throws IOException {
writer.write(scrollId);
writer.write(':');
try (StreamOutput out = new OutputStreamStreamOutput(Base64.getEncoder().wrap(new OutputStream() {
@Override
public void write(int b) throws IOException {
writer.write(b);
}
}))) {
out.writeNamedWriteableList(extractors);
out.writeVInt(limit);
}
}
@Override
public String getWriteableName() {
@ -132,6 +81,6 @@ public class ScrollCursor implements Cursor {
@Override
public String toString() {
return "cursor for scoll [" + scrollId + "]";
return "cursor for scroll [" + scrollId + "]";
}
}

View File

@ -5,11 +5,9 @@
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
@ -17,19 +15,11 @@ import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestRequest.Method;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.RestResponseListener;
import org.elasticsearch.xpack.sql.ClientSqlException;
import org.elasticsearch.xpack.sql.analysis.AnalysisException;
import org.elasticsearch.xpack.sql.analysis.catalog.MappingException;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.parser.ParsingException;
import org.elasticsearch.xpack.sql.planner.PlanningException;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractErrorResponse;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractExceptionResponse;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.session.Cursor;
@ -37,9 +27,6 @@ import org.elasticsearch.xpack.sql.session.Cursor;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE;
@ -47,7 +34,7 @@ import static org.elasticsearch.rest.RestStatus.OK;
import static org.elasticsearch.xpack.sql.util.StringUtils.EMPTY;
public abstract class AbstractSqlProtocolRestAction extends BaseRestHandler {
protected static final NamedWriteableRegistry CURSOR_REGISTRY = new NamedWriteableRegistry(Cursor.getNamedWriteables());
public static final NamedWriteableRegistry CURSOR_REGISTRY = new NamedWriteableRegistry(Cursor.getNamedWriteables());
private final AbstractProto proto;
protected AbstractSqlProtocolRestAction(Settings settings, AbstractProto proto) {

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.sql.plugin.sql.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -138,7 +139,7 @@ public class SqlResponse extends ActionResponse implements ToXContentObject {
builder.endArray();
if (cursor != Cursor.EMPTY) {
builder.field(SqlRequest.CURSOR.getPreferredName(), Cursor.encodeToString(cursor));
builder.field(SqlRequest.CURSOR.getPreferredName(), Cursor.encodeToString(Version.CURRENT, cursor));
}
}
return builder.endObject();

View File

@ -5,19 +5,30 @@
*/
package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.execution.search.ScrollCursor;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import static org.elasticsearch.xpack.sql.plugin.AbstractSqlProtocolRestAction.CURSOR_REGISTRY;
/**
* Information required to access the next page of response.
*/
@ -28,10 +39,6 @@ public interface Cursor extends NamedWriteable {
* Request the next page of data.
*/
void nextPage(Configuration cfg, Client client, ActionListener<RowSet> listener);
/**
* Write the {@linkplain Cursor} to a String for serialization over xcontent.
*/
void writeTo(java.io.Writer writer) throws IOException;
/**
* The {@link NamedWriteable}s required to deserialize {@link Cursor}s.
@ -47,37 +54,35 @@ public interface Cursor extends NamedWriteable {
/**
* Write a {@linkplain Cursor} to a string for serialization across xcontent.
*/
static String encodeToString(Cursor info) {
StringWriter writer = new StringWriter();
try {
writer.write(info.getWriteableName());
info.writeTo(writer);
} catch (IOException e) {
throw new RuntimeException("unexpected failure converting next page info to a string", e);
static String encodeToString(Version version, Cursor info) {
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
try (OutputStream base64 = Base64.getEncoder().wrap(os);
StreamOutput out = new OutputStreamStreamOutput(base64)) {
Version.writeVersion(version, out);
out.writeNamedWriteable(info);
}
return os.toString(StandardCharsets.UTF_8.name());
} catch (IOException ex) {
throw new RuntimeException("unexpected failure converting next page info to a string", ex);
}
return writer.toString();
}
/**
* Read a {@linkplain Cursor} from a string.
*/
static Cursor decodeFromString(String info) {
// TODO version compatibility
/* We need to encode minimum version across the cluster and use that
* to handle changes to this protocol across versions. */
String name = info.substring(0, 1);
try (java.io.Reader reader = new FastStringReader(info)) {
reader.skip(1);
switch (name) {
case EmptyCursor.NAME:
throw new RuntimeException("empty cursor shouldn't be encoded to a string");
case ScrollCursor.NAME:
return new ScrollCursor(reader);
default:
throw new RuntimeException("unknown cursor type [" + name + "]");
byte[] bytes = info.getBytes(StandardCharsets.UTF_8);
try (StreamInput delegate = new InputStreamStreamInput(Base64.getDecoder().wrap(new ByteArrayInputStream(bytes)));
StreamInput in = new NamedWriteableAwareStreamInput(delegate, CURSOR_REGISTRY)) {
Version version = Version.readVersion(in);
if (version.after(Version.CURRENT)) {
throw new RuntimeException("Unsupported scroll version " + version);
}
} catch (IOException e) {
throw new RuntimeException("unexpected failure deconding cursor", e);
in.setVersion(version);
return in.readNamedWriteable(Cursor.class);
} catch (IOException ex) {
throw new RuntimeException("unexpected failure deconding cursor", ex);
}
}
}

View File

@ -29,11 +29,6 @@ class EmptyCursor implements Cursor {
return NAME;
}
@Override
public void writeTo(java.io.Writer writer) throws IOException {
throw new IOException("no next page should not be converted to or from a string");
}
@Override
public void nextPage(Configuration cfg, Client client, ActionListener<RowSet> listener) {
throw new IllegalArgumentException("there is no next page");

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
@ -17,9 +16,9 @@ import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors;
import org.elasticsearch.xpack.sql.execution.search.extractor.InnerHitExtractorTests;
import org.elasticsearch.xpack.sql.execution.search.extractor.ProcessingHitExtractorTests;
import org.elasticsearch.xpack.sql.execution.search.extractor.SourceExtractorTests;
import org.elasticsearch.xpack.sql.session.Cursor;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
@ -68,13 +67,6 @@ public class ScrollCursorTests extends AbstractWireSerializingTestCase<ScrollCur
if (randomBoolean()) {
return super.copyInstance(instance, version);
}
// See comment in NextPageInfo#decodeFromString about versioning
assertEquals(Version.CURRENT, version);
try (StringWriter output = new StringWriter()) {
instance.writeTo(output);
try (java.io.Reader in = new FastStringReader(output.toString())) {
return new ScrollCursor(in);
}
}
return (ScrollCursor)Cursor.decodeFromString(Cursor.encodeToString(version, instance));
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.sql.plugin.sql.action;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -100,7 +101,21 @@ public class SqlResponseTests extends AbstractStreamableTestCase<SqlResponse> {
}
if (testInstance.cursor() != Cursor.EMPTY) {
assertEquals(rootMap.get(SqlRequest.CURSOR.getPreferredName()), Cursor.encodeToString(testInstance.cursor()));
assertEquals(rootMap.get(SqlRequest.CURSOR.getPreferredName()), Cursor.encodeToString(Version.CURRENT, testInstance.cursor()));
}
}
public void testVersionHandling() {
Cursor cursor = randomCursor();
assertEquals(cursor, Cursor.decodeFromString(Cursor.encodeToString(Version.CURRENT, cursor)));
Version nextMinorVersion = Version.fromId(Version.CURRENT.id + 10000);
String encodedWithWrongVersion = Cursor.encodeToString(nextMinorVersion, cursor);
RuntimeException exception = expectThrows(RuntimeException.class, () -> {
Cursor.decodeFromString(encodedWithWrongVersion);
});
assertEquals(exception.getMessage(), "Unsupported scroll version " + nextMinorVersion);
}
}