Add versioned DataInput and DataOutput to SQL (elastic/x-pack-elasticsearch#2600)

These wrap `DataInput` and `DataOutput` to add the protocol
version being serialized. This is similar to the mechanism
used by core and it has made adding and removing fields from
the serialization protocol fairly simple.

Original commit: elastic/x-pack-elasticsearch@90b3f1199a
This commit is contained in:
Nik Everett 2017-09-25 11:01:57 -04:00 committed by GitHub
parent 216058035b
commit 63a5ad0b1d
30 changed files with 324 additions and 102 deletions

View File

@ -7,8 +7,8 @@ package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractInfoRequest;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import java.io.DataInput;
import java.io.IOException;
/**
@ -26,8 +26,8 @@ public class InfoRequest extends AbstractInfoRequest {
super(jvmVersion, jvmVendor, jvmClassPath, osName, osVersion);
}
InfoRequest(int clientVersion, DataInput in) throws IOException {
super(clientVersion, in);
InfoRequest(SqlDataInput in) throws IOException {
super(in);
}
@Override

View File

@ -8,8 +8,8 @@ package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryInitRequest;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import java.io.DataInput;
import java.io.IOException;
import java.util.TimeZone;
@ -21,8 +21,8 @@ public class QueryInitRequest extends AbstractQueryInitRequest {
super(query, fetchSize, timeZone, timeout);
}
QueryInitRequest(int clientVersion, DataInput in) throws IOException {
super(clientVersion, in);
QueryInitRequest(SqlDataInput in) throws IOException {
super(in);
}
@Override

View File

@ -8,8 +8,8 @@ package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryPageRequest;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import java.io.DataInput;
import java.io.IOException;
public class QueryPageRequest extends AbstractQueryPageRequest {
@ -17,8 +17,8 @@ public class QueryPageRequest extends AbstractQueryPageRequest {
super(cursor, timeout);
}
QueryPageRequest(int clientVersion, DataInput in) throws IOException {
super(clientVersion, in);
QueryPageRequest(SqlDataInput in) throws IOException {
super(in);
}
@Override

View File

@ -7,9 +7,9 @@ package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataOutput;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
@ -30,8 +30,8 @@ public abstract class QueryResponse extends AbstractQueryResponse {
}
@Override
protected void writeTo(int clientVersion, DataOutput out) throws IOException {
super.writeTo(clientVersion, out);
protected void writeTo(SqlDataOutput out) throws IOException {
super.writeTo(out);
out.writeUTF(data);
}

View File

@ -7,8 +7,8 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractInfoRequest;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import java.io.DataInput;
import java.io.IOException;
/**
@ -26,8 +26,8 @@ public class InfoRequest extends AbstractInfoRequest {
super(jvmVersion, jvmVendor, jvmClassPath, osName, osVersion);
}
InfoRequest(int clientVersion, DataInput in) throws IOException {
super(clientVersion, in);
InfoRequest(SqlDataInput in) throws IOException {
super(in);
}
@Override

View File

@ -7,9 +7,9 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataOutput;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
@ -21,13 +21,13 @@ public class MetaColumnRequest extends Request {
this.columnPattern = columnPattern == null ? "" : columnPattern;
}
MetaColumnRequest(int clientVersion, DataInput in) throws IOException {
MetaColumnRequest(SqlDataInput in) throws IOException {
tablePattern = in.readUTF();
columnPattern = in.readUTF();
}
@Override
protected void writeTo(DataOutput out) throws IOException {
protected void writeTo(SqlDataOutput out) throws IOException {
out.writeUTF(tablePattern);
out.writeUTF(columnPattern);
}

View File

@ -9,9 +9,9 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataOutput;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -40,7 +40,7 @@ public class MetaColumnResponse extends Response {
}
@Override
protected void writeTo(int clientVersion, DataOutput out) throws IOException {
protected void writeTo(SqlDataOutput out) throws IOException {
out.writeInt(columns.size());
for (MetaColumnInfo info : columns) {
info.writeTo(out);

View File

@ -7,9 +7,9 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataOutput;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class MetaTableRequest extends Request {
@ -22,12 +22,12 @@ public class MetaTableRequest extends Request {
this.pattern = pattern;
}
MetaTableRequest(int clientVersion, DataInput in) throws IOException {
MetaTableRequest(SqlDataInput in) throws IOException {
this.pattern = in.readUTF();
}
@Override
public void writeTo(DataOutput out) throws IOException {
public void writeTo(SqlDataOutput out) throws IOException {
out.writeUTF(pattern);
}

View File

@ -9,9 +9,9 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataOutput;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -38,7 +38,7 @@ public class MetaTableResponse extends Response {
}
@Override
public void writeTo(int clientVersion, DataOutput out) throws IOException {
public void writeTo(SqlDataOutput out) throws IOException {
out.writeInt(tables.size());
for (String t : tables) {
out.writeUTF(t);

View File

@ -5,8 +5,9 @@
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import java.io.DataInput;
import java.io.DataOutput;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataOutput;
import java.io.IOException;
import java.lang.reflect.Array;
import java.sql.JDBCType;
@ -37,7 +38,7 @@ public class Page implements Payload {
private int maxRows;
/**
* Build empty, call {@link #readFrom(DataInput)} after to fill it.
* Build empty, call {@link #readFrom(SqlDataInput)} after to fill it.
*/
Page(List<ColumnInfo> columnInfo) {
this.columnInfo = columnInfo;
@ -89,10 +90,8 @@ public class Page implements Payload {
return column(column)[row];
}
/**
* Read a value from the stream
*/
public void readFrom(DataInput in) throws IOException {
@Override
public void readFrom(SqlDataInput in) throws IOException {
int rows = in.readInt();
// this.rows may be less than the number of rows we have space for
if (rows > maxRows) {
@ -107,7 +106,8 @@ public class Page implements Payload {
}
}
public void writeTo(DataOutput out) throws IOException {
@Override
public void writeTo(SqlDataOutput out) throws IOException {
int rows = rows();
out.writeInt(rows);
for (int row = 0; row < rows; row++) {

View File

@ -5,13 +5,14 @@
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import java.io.DataInput;
import java.io.DataOutput;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataOutput;
import java.io.IOException;
public interface Payload {
void readFrom(DataInput in) throws IOException;
void readFrom(SqlDataInput in) throws IOException;
void writeTo(DataOutput out) throws IOException;
void writeTo(SqlDataOutput out) throws IOException;
}

View File

@ -8,8 +8,8 @@ package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryInitRequest;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import java.io.DataInput;
import java.io.IOException;
import java.util.TimeZone;
@ -18,8 +18,8 @@ public class QueryInitRequest extends AbstractQueryInitRequest {
super(query, fetchSize, timeZone, timeout);
}
QueryInitRequest(int clientVersion, DataInput in) throws IOException {
super(clientVersion, in);
QueryInitRequest(SqlDataInput in) throws IOException {
super(in);
}
@Override

View File

@ -9,9 +9,9 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataOutput;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -29,7 +29,7 @@ public class QueryInitResponse extends AbstractQueryResponse {
this.data = data;
}
QueryInitResponse(Request request, DataInput in) throws IOException {
QueryInitResponse(Request request, SqlDataInput in) throws IOException {
super(request, in);
int size = in.readInt();
List<ColumnInfo> columns = new ArrayList<>(size);
@ -44,8 +44,8 @@ public class QueryInitResponse extends AbstractQueryResponse {
}
@Override
public void writeTo(int clientVersion, DataOutput out) throws IOException {
super.writeTo(clientVersion, out);
public void writeTo(SqlDataOutput out) throws IOException {
super.writeTo(out);
out.writeInt(columns.size());
for (ColumnInfo c : columns) {
c.writeTo(out);

View File

@ -9,8 +9,8 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryPageRequest;
import org.elasticsearch.xpack.sql.protocol.shared.Nullable;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import java.io.DataInput;
import java.io.IOException;
public class QueryPageRequest extends AbstractQueryPageRequest {
@ -21,8 +21,8 @@ public class QueryPageRequest extends AbstractQueryPageRequest {
this.data = data;
}
QueryPageRequest(int clientVersion, DataInput in) throws IOException {
super(clientVersion, in);
QueryPageRequest(SqlDataInput in) throws IOException {
super(in);
this.data = null; // data isn't used on the server side
}

View File

@ -9,9 +9,9 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataOutput;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
@ -23,7 +23,7 @@ public class QueryPageResponse extends AbstractQueryResponse {
this.data = data;
}
QueryPageResponse(Request request, DataInput in) throws IOException {
QueryPageResponse(Request request, SqlDataInput in) throws IOException {
super(request, in);
QueryPageRequest queryPageRequest = (QueryPageRequest) request;
data = queryPageRequest.data();
@ -31,8 +31,8 @@ public class QueryPageResponse extends AbstractQueryResponse {
}
@Override
public void writeTo(int clientVersion, DataOutput out) throws IOException {
super.writeTo(clientVersion, out);
public void writeTo(SqlDataOutput out) throws IOException {
super.writeTo(out);
data.writeTo(out);
}

View File

@ -5,8 +5,15 @@
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataOutput;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -50,9 +57,9 @@ public class PageTests extends ESTestCase {
public void testRoundTripNoReuse() throws IOException {
Page example = randomPage();
assertRoundTrip(example, Page::writeTo, in -> {
assertRoundTrip(example, writeTo(AbstractProto.CURRENT_VERSION), in -> {
Page page = new Page(example.columnInfo());
page.readFrom(in);
page.readFrom(new SqlDataInput(in, AbstractProto.CURRENT_VERSION));
return page;
});
}
@ -60,11 +67,15 @@ public class PageTests extends ESTestCase {
public void testRoundTripReuse() throws IOException {
Page example = randomPage();
Page target = new Page(example.columnInfo());
roundTrip(example, Page::writeTo, in -> {target.readFrom(in); return null;});
CheckedFunction<DataInput, Page, IOException> readFrom = in -> {
target.readFrom(new SqlDataInput(in, AbstractProto.CURRENT_VERSION));
return null;
};
roundTrip(example, writeTo(AbstractProto.CURRENT_VERSION), readFrom);
assertEquals(example, target);
example = randomPageContents(example.columnInfo());
roundTrip(example, Page::writeTo, in -> {target.readFrom(in); return null;});
roundTrip(example, writeTo(AbstractProto.CURRENT_VERSION), readFrom);
assertEquals(example, target);
}
@ -89,4 +100,9 @@ public class PageTests extends ESTestCase {
}).toString());
}
private static CheckedBiConsumer<Page, DataOutput, IOException> writeTo(int version) {
return (page, in) ->
page.writeTo(new SqlDataOutput(in, version));
}
}

View File

@ -7,12 +7,12 @@ package org.elasticsearch.xpack.sql.plugin.jdbc;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Payload;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ProtoUtils;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataOutput;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.type.DataType;
import org.joda.time.ReadableInstant;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.JDBCType;
import java.util.List;
@ -25,12 +25,12 @@ public class RowSetPayload implements Payload {
}
@Override
public void readFrom(DataInput in) throws IOException {
public void readFrom(SqlDataInput in) throws IOException {
throw new UnsupportedOperationException("This class can only be serialized");
}
@Override
public void writeTo(DataOutput out) throws IOException {
public void writeTo(SqlDataOutput out) throws IOException {
out.writeInt(rowSet.size());
List<DataType> types = rowSet.schema().types();

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.sql.protocol.shared;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.RequestType;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
@ -34,7 +33,7 @@ public abstract class AbstractErrorResponse extends Response {
}
@Override
protected final void writeTo(int clientVersion, DataOutput out) throws IOException {
protected final void writeTo(SqlDataOutput out) throws IOException {
out.writeUTF(message);
out.writeUTF(cause);
out.writeUTF(stack);

View File

@ -9,7 +9,6 @@ import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Objects;
@ -49,7 +48,7 @@ public abstract class AbstractExceptionResponse extends Response {
}
@Override
protected final void writeTo(int clientVersion, DataOutput out) throws IOException {
protected final void writeTo(SqlDataOutput out) throws IOException {
out.writeUTF(message);
out.writeUTF(cause);
exceptionType.writeTo(out);

View File

@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
@ -35,7 +33,7 @@ public abstract class AbstractInfoRequest extends Request {
this.osVersion = osVersion;
}
protected AbstractInfoRequest(int clientVersion, DataInput in) throws IOException {
protected AbstractInfoRequest(SqlDataInput in) throws IOException {
jvmVersion = in.readUTF();
jvmVendor = in.readUTF();
jvmClassPath = in.readUTF();
@ -44,7 +42,7 @@ public abstract class AbstractInfoRequest extends Request {
}
@Override
public final void writeTo(DataOutput out) throws IOException {
public final void writeTo(SqlDataOutput out) throws IOException {
out.writeUTF(jvmVersion);
out.writeUTF(jvmVendor);
out.writeUTF(jvmClassPath);

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
@ -40,7 +39,7 @@ public abstract class AbstractInfoResponse extends Response {
}
@Override
protected final void writeTo(int clientVersion, DataOutput out) throws IOException {
protected final void writeTo(SqlDataOutput out) throws IOException {
out.writeUTF(node);
out.writeUTF(cluster);
out.writeByte(majorVersion);

View File

@ -32,7 +32,7 @@ public abstract class AbstractProto {
public void writeRequest(Request request, DataOutput out) throws IOException {
writeHeader(CURRENT_VERSION, out);
request.requestType().writeTo(out);
request.writeTo(out);
request.writeTo(new SqlDataOutput(out, CURRENT_VERSION));
}
public Request readRequest(DataInput in) throws IOException {
@ -40,13 +40,13 @@ public abstract class AbstractProto {
if (clientVersion > CURRENT_VERSION) {
throw new IOException("Unknown client version [" + clientVersion + "]. Always upgrade client last.");
}
return readRequestType(in).reader().read(clientVersion, in);
return readRequestType(in).reader().read(new SqlDataInput(in, clientVersion));
}
public void writeResponse(Response response, int clientVersion, DataOutput out) throws IOException {
writeHeader(clientVersion, out);
response.responseType().writeTo(out);
response.writeTo(clientVersion, out);
response.writeTo(new SqlDataOutput(out, clientVersion));
}
public Response readResponse(Request request, DataInput in) throws IOException {
@ -56,7 +56,7 @@ public abstract class AbstractProto {
+ CURRENT_VERSION + "]. Server is busted.");
}
// TODO why do I need the response type at all? Just a byte for err/exception/normal, then get response type from request.
Response response = readResponseType(in).reader().read(request, in);
Response response = readResponseType(in).reader().read(request, new SqlDataInput(in, version));
if (response.requestType() != request.requestType()) {
throw new IOException("Expected request type to be [" + request.requestType()
+ "] but was [" + response.requestType() + "]. Server is busted.");
@ -107,7 +107,7 @@ public abstract class AbstractProto {
protected abstract ResponseType readResponseType(DataInput in) throws IOException;
@FunctionalInterface
protected interface RequestReader {
Request read(int clientVersion, DataInput in) throws IOException;
Request read(SqlDataInput in) throws IOException;
}
protected interface RequestType {
void writeTo(DataOutput out) throws IOException;
@ -115,7 +115,7 @@ public abstract class AbstractProto {
}
@FunctionalInterface
protected interface ResponseReader {
Response read(Request request, DataInput in) throws IOException;
Response read(Request request, SqlDataInput in) throws IOException;
}
protected interface ResponseType {
void writeTo(DataOutput out) throws IOException;

View File

@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
import java.util.TimeZone;
@ -29,7 +27,7 @@ public abstract class AbstractQueryInitRequest extends Request {
this.timeout = timeout;
}
protected AbstractQueryInitRequest(int clientVersion, DataInput in) throws IOException {
protected AbstractQueryInitRequest(SqlDataInput in) throws IOException {
query = in.readUTF();
fetchSize = in.readInt();
timeZone = TimeZone.getTimeZone(in.readUTF());
@ -37,7 +35,7 @@ public abstract class AbstractQueryInitRequest extends Request {
}
@Override
public void writeTo(DataOutput out) throws IOException {
public void writeTo(SqlDataOutput out) throws IOException {
out.writeUTF(query);
out.writeInt(fetchSize);
out.writeUTF(timeZone.getID());

View File

@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
@ -27,14 +25,14 @@ public abstract class AbstractQueryPageRequest extends Request {
this.timeout = timeout;
}
protected AbstractQueryPageRequest(int clientVersion, DataInput in) throws IOException {
protected AbstractQueryPageRequest(SqlDataInput in) throws IOException {
this.cursor = new byte[ProtoUtil.readArraySize(in)];
in.readFully(cursor);
this.timeout = new TimeoutInfo(in);
}
@Override
public void writeTo(DataOutput out) throws IOException {
public void writeTo(SqlDataOutput out) throws IOException {
out.writeInt(cursor.length);
out.write(cursor);
timeout.writeTo(out);

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
@ -35,7 +34,7 @@ public abstract class AbstractQueryResponse extends Response {
}
@Override
protected void writeTo(int clientVersion, DataOutput out) throws IOException {
protected void writeTo(SqlDataOutput out) throws IOException {
out.writeLong(tookNanos);
out.writeInt(cursor.length);
out.write(cursor);

View File

@ -20,7 +20,7 @@ public abstract class Request {
* Write this request to the {@link DataOutput}. Implementers should
* be kind and stick this right under the ctor that reads the response.
*/
protected abstract void writeTo(DataOutput out) throws IOException;
protected abstract void writeTo(SqlDataOutput out) throws IOException;
/**
* Body to go into the {@link #toString()} result.

View File

@ -19,11 +19,8 @@ public abstract class Response {
/**
* Write this response to the {@link DataOutput}.
* @param clientVersion The version of the client that requested
* the message. This should be used to send a response compatible
* with the client.
*/
protected abstract void writeTo(int clientVersion, DataOutput out) throws IOException;
protected abstract void writeTo(SqlDataOutput out) throws IOException;
/**
* Body to go into the {@link #toString()} result.

View File

@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataInput;
import java.io.IOException;
/**
* {@linkplain DataInput} customized for SQL. It has:
* <ul>
* <li>{@link #version}. This allows us to add new fields
* to the protocol in a backwards compatible way by bumping
* the version number.</li>
* </ul>
*/public final class SqlDataInput implements DataInput {
private final DataInput delegate;
private final int version;
public SqlDataInput(DataInput delegate, int version) {
this.delegate = delegate;
this.version = version;
}
/**
* Version of the protocol to use. When new fields are added
* to the protocol we bump the maximum version. Requests and
* responses use the minimum version understood by both the
* client and the server.
*/
public int version() {
return version;
}
@Override
public void readFully(byte[] b) throws IOException {
delegate.readFully(b);
}
@Override
public void readFully(byte[] b, int off, int len) throws IOException {
delegate.readFully(b, off, len);
}
@Override
public int skipBytes(int n) throws IOException {
return delegate.skipBytes(n);
}
@Override
public boolean readBoolean() throws IOException {
return delegate.readBoolean();
}
@Override
public byte readByte() throws IOException {
return delegate.readByte();
}
@Override
public int readUnsignedByte() throws IOException {
return delegate.readUnsignedByte();
}
@Override
public short readShort() throws IOException {
return delegate.readShort();
}
@Override
public int readUnsignedShort() throws IOException {
return delegate.readUnsignedShort();
}
@Override
public char readChar() throws IOException {
return delegate.readChar();
}
@Override
public int readInt() throws IOException {
return delegate.readInt();
}
@Override
public long readLong() throws IOException {
return delegate.readLong();
}
@Override
public float readFloat() throws IOException {
return delegate.readFloat();
}
@Override
public double readDouble() throws IOException {
return delegate.readDouble();
}
@Override
public String readLine() throws IOException {
return delegate.readLine();
}
@Override
public String readUTF() throws IOException {
return delegate.readUTF();
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataOutput;
import java.io.IOException;
/**
* {@linkplain DataOutput} customized for SQL. It has:
* <ul>
* <li>{@link #version}. This allows us to add new fields
* to the protocol in a backwards compatible way by bumping
* the version number.</li>
* </ul>
*/
public final class SqlDataOutput implements DataOutput {
private final DataOutput delegate;
private final int version;
public SqlDataOutput(DataOutput delegate, int version) {
this.delegate = delegate;
this.version = version;
}
/**
* Version of the protocol to use. When new fields are added
* to the protocol we bump the maximum version. Requests and
* responses use the minimum version understood by both the
* client and the server.
*/
public int version() {
return version;
}
@Override
public void write(int b) throws IOException {
delegate.write(b);
}
@Override
public void write(byte[] b) throws IOException {
delegate.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
delegate.write(b, off, len);
}
@Override
public void writeBoolean(boolean v) throws IOException {
delegate.writeBoolean(v);
}
@Override
public void writeByte(int v) throws IOException {
delegate.writeByte(v);
}
@Override
public void writeShort(int v) throws IOException {
delegate.writeShort(v);
}
@Override
public void writeChar(int v) throws IOException {
delegate.writeChar(v);
}
@Override
public void writeInt(int v) throws IOException {
delegate.writeInt(v);
}
@Override
public void writeLong(long v) throws IOException {
delegate.writeLong(v);
}
@Override
public void writeFloat(float v) throws IOException {
delegate.writeFloat(v);
}
@Override
public void writeDouble(double v) throws IOException {
delegate.writeDouble(v);
}
@Override
public void writeBytes(String s) throws IOException {
delegate.writeBytes(s);
}
@Override
public void writeChars(String s) throws IOException {
delegate.writeChars(s);
}
@Override
public void writeUTF(String s) throws IOException {
delegate.writeUTF(s);
}
}

View File

@ -27,21 +27,21 @@ public abstract class RoundTripTestUtils {
// Only static utilities here
}
public static <T> void assertRoundTrip(T example, CheckedBiConsumer<T, DataOutput, IOException> encode,
CheckedFunction<DataInput, T, IOException> decode) throws IOException {
T once = roundTrip(example, encode, decode);
public static <T> void assertRoundTrip(T example, CheckedBiConsumer<T, DataOutput, IOException> writeTo,
CheckedFunction<DataInput, T, IOException> readFrom) throws IOException {
T once = roundTrip(example, writeTo, readFrom);
assertEquals(example, once);
T twice = roundTrip(once, encode, decode);
T twice = roundTrip(once, writeTo, readFrom);
assertEquals(example, twice);
assertEquals(once, twice);
}
public static <T> T roundTrip(T example, CheckedBiConsumer<T, DataOutput, IOException> encode,
CheckedFunction<DataInput, T, IOException> decode) throws IOException {
public static <T> T roundTrip(T example, CheckedBiConsumer<T, DataOutput, IOException> writeTo,
CheckedFunction<DataInput, T, IOException> readFrom) throws IOException {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
encode.accept(example, new DataOutputStream(out));
writeTo.accept(example, new DataOutputStream(out));
try (InputStream in = new ByteArrayInputStream(out.toByteArray())) {
T decoded = decode.apply(new DataInputStream(in));
T decoded = readFrom.apply(new DataInputStream(in));
assertEquals("should have emptied the stream", 0, in.available());
return decoded;
}