Remove many NOCOMMITs from cli protocol

* Move read from a static method to a ctor to mirror core.
* Make read and writes read and write the same data.
* Instead of the "header" integer use a byte for the response
type.
* For responses that do not make their request type obvious
then serialize the request type.
* Remove the request type member from requests and responses and
replace with an abstract method. These type members have caused
us trouble in core in the past.
* Remove the Message superclass as it didn't have anything in it.
* Pass client version to the request reader and response writer.
* Add round trip tests for the protocol.
* Force Requests and Responses to provide good `toString`, `equals`,
and `hashCode`.

Original commit: elastic/x-pack-elasticsearch@653ed8c27f
This commit is contained in:
Nik Everett 2017-07-10 11:36:08 -04:00
parent 0fe1e4bb48
commit 3759349db8
25 changed files with 615 additions and 398 deletions

View File

@ -59,9 +59,9 @@ import org.elasticsearch.xpack.security.user.ElasticUser;
import org.elasticsearch.xpack.security.user.SystemUser;
import org.elasticsearch.xpack.security.user.User;
import org.elasticsearch.xpack.security.user.XPackUser;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.server.cli.CliAction;
import java.util.Arrays;
import java.util.Collections;

View File

@ -5,29 +5,50 @@
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import java.util.Objects;
public class CommandRequest extends Request {
public final String command;
public CommandRequest(String command) {
super(Action.COMMAND);
this.command = command;
}
CommandRequest(int clientVersion, DataInput in) throws IOException {
command = in.readUTF();
}
@Override
public void encode(DataOutput out) throws IOException {
out.writeInt(action.value());
public void write(DataOutput out) throws IOException {
out.writeUTF(command);
}
public static CommandRequest decode(DataInput in) throws IOException {
String result = in.readUTF();
return new CommandRequest(result);
@Override
protected String toStringBody() {
return command;
}
@Override
public RequestType requestType() {
return RequestType.COMMAND;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
CommandRequest other = (CommandRequest) obj;
return Objects.equals(command, other.command);
}
@Override
public int hashCode() {
return Objects.hash(command);
}
}

View File

@ -5,23 +5,20 @@
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Status;
public class CommandResponse extends Response {
public final long serverTimeQueryReceived, serverTimeResponseSent;
public final String requestId;
public final String data;
public CommandResponse(long serverTimeQueryReceived, long serverTimeResponseSent, String requestId, String data) {
super(Action.COMMAND);
this.serverTimeQueryReceived = serverTimeQueryReceived;
this.serverTimeResponseSent = serverTimeResponseSent;
this.requestId = requestId;
@ -29,17 +26,15 @@ public class CommandResponse extends Response {
this.data = data;
}
public CommandResponse(DataInput in) throws IOException {
super(Action.COMMAND);
CommandResponse(DataInput in) throws IOException {
serverTimeQueryReceived = in.readLong();
serverTimeResponseSent = in.readLong();
requestId = in.readUTF();
data = in.readUTF();
}
public void encode(DataOutput out) throws IOException {
out.writeInt(Status.toSuccess(action)); // NOCOMMIT not symetric!
@Override
void write(int clientVersion, DataOutput out) throws IOException {
out.writeLong(serverTimeQueryReceived);
out.writeLong(serverTimeResponseSent);
out.writeUTF(requestId);
@ -47,11 +42,21 @@ public class CommandResponse extends Response {
}
@Override
public String toString() {
return "CommandResponse<received=[" + serverTimeQueryReceived
protected String toStringBody() {
return "received=[" + serverTimeQueryReceived
+ "] sent=[" + serverTimeResponseSent
+ "] requestId=[" + requestId
+ "] data=[" + data + "]>";
+ "] data=[" + data + "]";
}
@Override
RequestType requestType() {
return RequestType.COMMAND;
}
@Override
ResponseType responseType() {
return ResponseType.COMMAND;
}
@Override
@ -70,4 +75,4 @@ public class CommandResponse extends Response {
public int hashCode() {
return Objects.hash(serverTimeQueryReceived, serverTimeResponseSent, requestId, data);
}
}
}

View File

@ -5,36 +5,76 @@
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Status;
/**
* Response sent when there is a server side error.
*/
public class ErrorResponse extends Response {
private final RequestType requestType;
public final String message, cause, stack;
public ErrorResponse(Action requestedAction, String message, String cause, String stack) {
super(requestedAction);
public ErrorResponse(RequestType requestType, String message, String cause, String stack) {
this.requestType = requestType;
this.message = message;
this.cause = cause;
this.stack = stack;
}
ErrorResponse(DataInput in) throws IOException {
requestType = RequestType.read(in);
message = in.readUTF();
cause = in.readUTF();
stack = in.readUTF();
}
@Override
public void encode(DataOutput out) throws IOException {
out.writeInt(Status.toError(action));
void write(int clientVersion, DataOutput out) throws IOException {
requestType.write(out);
out.writeUTF(message);
out.writeUTF(cause);
out.writeUTF(stack);
}
public static ErrorResponse decode(DataInput in, Action action) throws IOException {
String message = in.readUTF();
String cause = in.readUTF();
String stack = in.readUTF();
return new ErrorResponse(action, message, cause, stack);
@Override
protected String toStringBody() {
return "request=[" + requestType
+ "] message=[" + message
+ "] cuase=[" + cause
+ "] stack=[" + stack + "]";
}
@Override
RequestType requestType() {
return requestType;
}
@Override
ResponseType responseType() {
return ResponseType.ERROR;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
ErrorResponse other = (ErrorResponse) obj;
return Objects.equals(requestType, other.requestType)
&& Objects.equals(message, other.message)
&& Objects.equals(cause, other.cause)
&& Objects.equals(stack, other.stack);
}
@Override
public int hashCode() {
return Objects.hash(requestType, message, cause, stack);
}
}

View File

@ -5,34 +5,70 @@
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Status;
/**
* Response sent when there is a client side error.
*/
public class ExceptionResponse extends Response {
private final RequestType requestType;
public final String message, cause;
public ExceptionResponse(Action requestedAction, String message, String cause) {
super(requestedAction);
public ExceptionResponse(RequestType requestType, String message, String cause) {
this.requestType = requestType;
this.message = message;
this.cause = cause;
}
ExceptionResponse(DataInput in) throws IOException {
requestType = RequestType.read(in);
message = in.readUTF();
cause = in.readUTF();
}
@Override
public void encode(DataOutput out) throws IOException {
out.writeInt(Status.toException(action));
void write(int clientVersion, DataOutput out) throws IOException {
requestType.write(out);
out.writeUTF(message);
out.writeUTF(cause);
}
public static ExceptionResponse decode(DataInput in, Action action) throws IOException {
String message = in.readUTF();
String cause = in.readUTF();
@Override
protected String toStringBody() {
return "request=[" + requestType
+ "] message=[" + message
+ "] cuase=[" + cause + "]";
}
return new ExceptionResponse(action, message, cause);
@Override
RequestType requestType() {
return requestType;
}
@Override
ResponseType responseType() {
return ResponseType.EXCEPTION;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
ExceptionResponse other = (ExceptionResponse) obj;
return Objects.equals(requestType, other.requestType)
&& Objects.equals(message, other.message)
&& Objects.equals(cause, other.cause);
}
@Override
public int hashCode() {
return Objects.hash(requestType, message, cause);
}
}

View File

@ -5,32 +5,28 @@
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Properties;
import java.util.Objects;
public class InfoRequest extends Request {
private static final String EMPTY = "";
public final String jvmVersion, jvmVendor, jvmClassPath, osName, osVersion;
/**
* Build the info request containing information about the current JVM.
*/
public InfoRequest() {
super(Action.INFO);
jvmVersion = System.getProperty("java.version", EMPTY);
jvmVendor = System.getProperty("java.vendor", EMPTY);
jvmClassPath = System.getProperty("java.class.path", EMPTY);
osName = System.getProperty("os.name", EMPTY);
osVersion = System.getProperty("os.version", EMPTY);
jvmVersion = System.getProperty("java.version", "");
jvmVendor = System.getProperty("java.vendor", "");
jvmClassPath = System.getProperty("java.class.path", "");
osName = System.getProperty("os.name", "");
osVersion = System.getProperty("os.version", "");
}
public InfoRequest(String jvmVersion, String jvmVendor, String jvmClassPath, String osName, String osVersion) {
super(Action.INFO);
InfoRequest(String jvmVersion, String jvmVendor, String jvmClassPath, String osName, String osVersion) {
this.jvmVersion = jvmVersion;
this.jvmVendor = jvmVendor;
this.jvmClassPath = jvmClassPath;
@ -38,9 +34,16 @@ public class InfoRequest extends Request {
this.osVersion = osVersion;
}
InfoRequest(int clientVersion, DataInput in) throws IOException {
jvmVersion = in.readUTF();
jvmVendor = in.readUTF();
jvmClassPath = in.readUTF();
osName = in.readUTF();
osVersion = in.readUTF();
}
@Override
public void encode(DataOutput out) throws IOException {
out.writeInt(action.value());
public void write(DataOutput out) throws IOException {
out.writeUTF(jvmVersion);
out.writeUTF(jvmVendor);
out.writeUTF(jvmClassPath);
@ -48,13 +51,35 @@ public class InfoRequest extends Request {
out.writeUTF(osVersion);
}
public static InfoRequest decode(DataInput in) throws IOException {
String jvmVersion = in.readUTF();
String jvmVendor = in.readUTF();
String jvmClassPath = in.readUTF();
String osName = in.readUTF();
String osVersion = in.readUTF();
@Override
protected String toStringBody() {
return "jvm=[version=[" + jvmVersion
+ "] vendor=[" + jvmVendor
+ "] classPath=[" + jvmClassPath
+ "]] os=[name=[" + osName
+ "] version=[" + osVersion + "]]";
}
return new InfoRequest(jvmVersion, jvmVendor, jvmClassPath, osName, osVersion);
@Override
public RequestType requestType() {
return RequestType.INFO;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
InfoRequest other = (InfoRequest) obj;
return Objects.equals(jvmVersion, other.jvmVersion)
&& Objects.equals(jvmVendor, other.jvmVendor)
&& Objects.equals(jvmClassPath, other.jvmClassPath)
&& Objects.equals(osName, other.osName)
&& Objects.equals(osVersion, other.osVersion);
}
@Override
public int hashCode() {
return Objects.hash(jvmVersion, jvmVendor, jvmClassPath, osName, osVersion);
}
}

View File

@ -5,21 +5,21 @@
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Status;
import java.util.Objects;
public class InfoResponse extends Response {
public final String node, cluster, versionString, versionHash, versionDate;
public final int majorVersion, minorVersion;
public InfoResponse(String nodeName, String clusterName, byte versionMajor, byte versionMinor, String version, String versionHash, String versionDate) {
super(Action.INFO);
public InfoResponse(String nodeName, String clusterName, byte versionMajor, byte versionMinor, String version,
String versionHash, String versionDate) {
this.node = nodeName;
this.cluster = clusterName;
this.versionString = version;
@ -30,8 +30,18 @@ public class InfoResponse extends Response {
this.minorVersion = versionMinor;
}
public void encode(DataOutput out) throws IOException {
out.writeInt(Status.toSuccess(action));
InfoResponse(DataInput in) throws IOException {
node = in.readUTF();
cluster = in.readUTF();
majorVersion = in.readByte();
minorVersion = in.readByte();
versionString = in.readUTF();
versionHash = in.readUTF();
versionDate = in.readUTF();
}
@Override
void write(int clientVersion, DataOutput out) throws IOException {
out.writeUTF(node);
out.writeUTF(cluster);
out.writeByte(majorVersion);
@ -41,15 +51,44 @@ public class InfoResponse extends Response {
out.writeUTF(versionDate);
}
public static InfoResponse decode(DataInput in) throws IOException {
String node = in.readUTF();
String cluster = in.readUTF();
byte versionMajor = in.readByte();
byte versionMinor = in.readByte();
String version = in.readUTF();
String versionHash = in.readUTF();
String versionBuild = in.readUTF();
@Override
protected String toStringBody() {
return "node=[" + node
+ "] cluster=[" + cluster
+ "] version=[" + versionString
+ "]/[major=[" + majorVersion
+ "] minor=[" + minorVersion
+ "] hash=[" + versionHash
+ "] date=[" + versionDate + "]";
}
return new InfoResponse(node, cluster, versionMajor, versionMinor, version, versionHash, versionBuild);
@Override
RequestType requestType() {
return RequestType.INFO;
}
@Override
ResponseType responseType() {
return ResponseType.INFO;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
InfoResponse other = (InfoResponse) obj;
return Objects.equals(node, other.node)
&& Objects.equals(cluster, other.cluster)
&& Objects.equals(majorVersion, other.majorVersion)
&& Objects.equals(minorVersion, other.minorVersion)
&& Objects.equals(versionString, other.versionString)
&& Objects.equals(versionHash, other.versionHash)
&& Objects.equals(versionDate, other.versionDate);
}
@Override
public int hashCode() {
return Objects.hash(node, cluster, majorVersion, minorVersion, versionString, versionHash, versionDate);
}
}

View File

@ -1,27 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import java.io.DataOutput;
import java.io.IOException;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
abstract class Message {
public final Action action;
protected Message(Action action) {
this.action = action;
}
@Override
public String toString() {
return action.name();
}
public abstract void encode(DataOutput out) throws IOException;
}

View File

@ -5,153 +5,131 @@
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import static java.util.stream.Collectors.toMap;
/**
* Binary protocol for the CLI. All backwards compatibility is done using the
* version number sent in the header.
*/
public abstract class Proto {
private static final int MAGIC_NUMBER = 0x0C0DEC110;
public static final int CURRENT_VERSION = 000_000_001;
//
// Basic tabular messaging for the CLI
//
// The protocol is very similar (a subset) to the JDBC driver
//
// To simplify things, the protocol is NOT meant to be backwards compatible.
//
public interface Proto {
// All requests start with
// magic_number - int - just because
// version - int - the version the client understands
// action - int - action to perform
// <action-param> (see below)
int MAGIC_NUMBER = 0x0C0DEC110;
int VERSION = 000_000_001;
public interface Header {
int value();
private Proto() {
// Static utilities
}
// The response start with a similar pattern
// magic_number
// version
// action reply (status)
// payload
public static void writeRequest(Request request, DataOutput out) throws IOException {
writeHeader(CURRENT_VERSION, out);
request.requestType().write(out);
request.write(out);
}
enum Status implements Header {
// If successful, each method has its own params (describe for each method)
SUCCESS (0x5000000),
// Expected exceptions contain
// message - string - exception message
// exception - string - exception class
// sql exception - int - to what SqlException type this maps to (see below)
EXCEPTION(0x3000000),
// Unexpected error contains the following fields
// message - string - exception message
// exception - string - exception class
// stacktrace - string - exception stacktrace (should be massaged)
ERROR (0xF000000);
private static final Map<Integer, Status> MAP = Arrays.stream(Status.class.getEnumConstants())
.collect(toMap(Status::value, Function.identity()));
private final int value;
Status(int value) {
this.value = value;
public static Request readRequest(DataInput in) throws IOException {
int clientVersion = readHeader(in);
if (clientVersion > CURRENT_VERSION) {
throw new IOException("Unknown client version [" + clientVersion + "]. Always upgrade sql last.");
// NOCOMMIT I believe we usually advise upgrading the clients *first* so this might be backwards.....
}
@Override
public int value() {
return value;
return RequestType.read(in).reader.read(clientVersion, in);
}
public static void writeResponse(Response response, int clientVersion, DataOutput out) throws IOException {
writeHeader(clientVersion, out);
response.responseType().write(out);
response.write(clientVersion, out);
}
public static Response readResponse(RequestType expectedRequestType, DataInput in) throws IOException {
int version = readHeader(in);
if (version != CURRENT_VERSION) {
throw new IOException("Response version [" + version + "] does not match client version ["
+ CURRENT_VERSION + "]. Server is busted.");
}
Response response = ResponseType.read(in).reader.read(in);
if (response.requestType() != expectedRequestType) {
throw new IOException("Expected request type to be [" + expectedRequestType
+ "] but was [" + response.requestType() + "]. Server is busted.");
}
return response;
}
private static void writeHeader(int clientVersion, DataOutput out) throws IOException {
out.writeInt(MAGIC_NUMBER);
out.writeInt(clientVersion);
}
/**
* Read the protocol header.
* @return the version
* @throws IOException if there is an underlying {@linkplain IOException} or if the protocol is malformed
*/
private static int readHeader(DataInput in) throws IOException {
int magic = in.readInt();
if (magic != MAGIC_NUMBER) {
throw new IOException("Unknown protocol magic number [" + Integer.toHexString(magic) + "]");
}
int version = in.readInt();
return version;
}
@FunctionalInterface
interface RequestReader {
Request read(int clientVersion, DataInput in) throws IOException;
}
public enum RequestType {
INFO(InfoRequest::new),
COMMAND(CommandRequest::new);
private final RequestReader reader;
RequestType(RequestReader reader) {
this.reader = reader;
}
public static Status from(int value) {
return MAP.get(value & 0xF000000);
void write(DataOutput out) throws IOException {
out.writeByte(ordinal());
}
public static int toSuccess(Action action) {
return action.value() | SUCCESS.value();
}
public static int toException(Action action) {
return action.value() | EXCEPTION.value();
}
public static int toError(Action action) {
return action.value() | ERROR.value();
static RequestType read(DataInput in) throws IOException {
byte b = in.readByte();
try {
return values()[b];
} catch (ArrayIndexOutOfBoundsException e) {
throw new IllegalArgumentException("Unknown response type [" + b + "]", e);
}
}
}
//
// RPC
//
@FunctionalInterface
interface ResponseReader {
Response read(DataInput in) throws IOException;
}
enum ResponseType {
EXCEPTION(ExceptionResponse::new),
ERROR(ErrorResponse::new),
INFO(InfoResponse::new),
COMMAND(CommandResponse::new);
enum Action implements Header {
private final ResponseReader reader;
//
// Retrieves information about the server
//
//
// java.version - string
// java.vendor - string
// java.class.path - string
// os.name - string
// os.version - string
//
//
// node.name - string
// cluster.name - string
// version.major - byte
// version.minor - byte
// version.number - string
// version.hash - string
// version.build - string
// # nodes - fall back nodes to connect to
// for each node
// node.name - string
// node.address - string
//
INFO(0x01),
//
// Retrieves metadata about tables
//
// Request:
//
// name pattern - string
//
// Response:
//
// # tables - int - index.type
// for each table
// name - string - table name
//
COMMAND(0x10);
private static final Map<Integer, Action> MAP = Arrays.stream(Action.class.getEnumConstants())
.collect(toMap(Action::value, Function.identity()));
private final int value;
Action(int value) {
this.value = value;
ResponseType(ResponseReader reader) {
this.reader = reader;
}
@Override
public int value() {
return value;
void write(DataOutput out) throws IOException {
out.writeByte(ordinal());
}
public static Action from(int value) {
return MAP.get(value & 0x00000FF);
static ResponseType read(DataInput in) throws IOException {
byte b = in.readByte();
try {
return values()[b];
} catch (ArrayIndexOutOfBoundsException e) {
throw new IllegalArgumentException("Unknown response type [" + b + "]", e);
}
}
}
}

View File

@ -1,75 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Locale;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Status;
import static java.lang.String.format;
import static org.elasticsearch.xpack.sql.cli.net.protocol.Proto.MAGIC_NUMBER;
import static org.elasticsearch.xpack.sql.cli.net.protocol.Proto.VERSION;
public abstract class ProtoUtils {
public static void write(DataOutput out, Message m) throws IOException {
out.writeInt(MAGIC_NUMBER);
out.writeInt(VERSION);
m.encode(out);
}
public static Request readRequest(DataInput in) throws IOException {
switch (Action.from(in.readInt())) {
case INFO:
return InfoRequest.decode(in);
case COMMAND:
return CommandRequest.decode(in);
default:
// cannot find action type
return null;
}
}
public static Response readResponse(DataInput in, int header) throws IOException {
Action action = Action.from(header);
switch (Status.from(header)) {
case EXCEPTION:
return ExceptionResponse.decode(in, action);
case ERROR:
return ErrorResponse.decode(in, action);
case SUCCESS:
switch (action) {
case INFO:
return InfoResponse.decode(in);
case COMMAND:
return new CommandResponse(in);
default:
// cannot find action type
return null; // NOCOMMIT throw an exception?
}
default:
return null;
}
}
public static String readHeader(DataInput in) throws IOException {
if (MAGIC_NUMBER != in.readInt()) {
return "Invalid protocol";
}
int ver = in.readInt();
if (VERSION != ver) {
return format(Locale.ROOT, "Expected JDBC protocol version %s, found %s", VERSION, ver);
}
return null;
}
}

View File

@ -5,11 +5,45 @@
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
public abstract class Request extends Message {
import java.io.DataOutput;
import java.io.IOException;
public Request(Action action) {
super(action);
public abstract class Request {
@Override
public final String toString() {
return getClass().getSimpleName() + "<" + toStringBody() + ">";
}
/**
* Write this request to the {@link DataOutput}. Implementers should
* be kind and stick this right under the ctor that reads the response.
*/
abstract void write(DataOutput out) throws IOException;
/**
* Body to go into the {@link #toString()} result.
*/
protected abstract String toStringBody();
/**
* Type of this request.
*/
public abstract RequestType requestType();
/*
* Must properly implement {@linkplain #equals(Object)} for
* round trip testing.
*/
@Override
public abstract boolean equals(Object obj);
/*
* Must properly implement {@linkplain #hashCode()} for
* round trip testing.
*/
@Override
public abstract int hashCode();
}

View File

@ -5,11 +5,52 @@
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType;
public abstract class Response extends Message {
import java.io.DataOutput;
import java.io.IOException;
public Response(Action action) {
super(action);
public abstract class Response {
@Override
public final String toString() {
return getClass().getSimpleName() + "<" + toStringBody() + ">";
}
/**
* Write this response to the {@link DataOutput}.
* @param clientVersion The version of the client that requested
* the message. This should be used to send a response compatible
* with the client.
*/
abstract void write(int clientVersion, DataOutput out) throws IOException;
/**
* Body to go into the {@link #toString()} result.
*/
protected abstract String toStringBody();
/**
* Type of the request for which this is the response.
*/
abstract RequestType requestType();
/**
* Type of this response.
*/
abstract ResponseType responseType();
/*
* Must properly implement {@linkplain #equals(Object)} for
* round trip testing.
*/
@Override
public abstract boolean equals(Object obj);
/*
* Must properly implement {@linkplain #hashCode()} for
* round trip testing.
*/
@Override
public abstract int hashCode();
}

View File

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.test.RoundTripTestUtils;
import java.io.IOException;
public final class CliRoundTripTestUtils {
private CliRoundTripTestUtils() {
// Just static utilities
}
static void assertRoundTripCurrentVersion(Request request) throws IOException {
RoundTripTestUtils.assertRoundTrip(request, Proto::writeRequest, Proto::readRequest);
}
static void assertRoundTripCurrentVersion(Response response) throws IOException {
RoundTripTestUtils.assertRoundTrip(response,
(r, out) -> Proto.writeResponse(r, Proto.CURRENT_VERSION, out),
in -> Proto.readResponse(response.requestType(), in));
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
public class CommandRequestTests extends ESTestCase {
static CommandRequest randomCommandRequest() {
return new CommandRequest(randomAlphaOfLength(5));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomCommandRequest());
}
}

View File

@ -9,8 +9,7 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.test.RoundTripTestUtils.assertRoundTrip;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
public class CommandResponseTests extends ESTestCase {
static CommandResponse randomCommandResponse() {
long start = randomNonNegativeLong();
@ -19,9 +18,6 @@ public class CommandResponseTests extends ESTestCase {
}
public void testRoundTrip() throws IOException {
assertRoundTrip(randomCommandResponse(), CommandResponse::encode, in -> {
// NOCOMMIT make symmetric
return (CommandResponse) ProtoUtils.readResponse(in, in.readInt());
});
assertRoundTripCurrentVersion(randomCommandResponse());
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
public class ErrorResponseTests extends ESTestCase {
static ErrorResponse randomErrorResponse() {
return new ErrorResponse(randomFrom(RequestType.values()), randomAlphaOfLength(5), randomAlphaOfLength(5), randomAlphaOfLength(5));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomErrorResponse());
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
public class ExceptionResponseTests extends ESTestCase {
static ExceptionResponse randomExceptionResponse() {
return new ExceptionResponse(randomFrom(RequestType.values()), randomAlphaOfLength(5), randomAlphaOfLength(5));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomExceptionResponse());
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
public class InfoRequestTests extends ESTestCase {
static InfoRequest randomInfoRequest() {
return new InfoRequest(randomAlphaOfLength(5), randomAlphaOfLength(5), randomAlphaOfLength(5),
randomAlphaOfLength(5), randomAlphaOfLength(5));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomInfoRequest());
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
public class InfoResponseTests extends ESTestCase {
static InfoResponse randomInfoResponse() {
return new InfoResponse(randomAlphaOfLength(5), randomAlphaOfLength(5), randomByte(), randomByte(),
randomAlphaOfLength(5), randomAlphaOfLength(5), randomAlphaOfLength(5));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomInfoResponse());
}
}

View File

@ -9,8 +9,8 @@ import org.elasticsearch.xpack.sql.cli.CliConfiguration;
import org.elasticsearch.xpack.sql.cli.CliException;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.ProtoUtils;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.Response;
import org.elasticsearch.xpack.sql.net.client.util.Bytes;
@ -20,23 +20,20 @@ import java.io.DataInputStream;
import java.io.IOException;
public class CliHttpClient implements AutoCloseable {
private final HttpClient http;
private final CliConfiguration cfg;
public CliHttpClient(CliConfiguration cfg) {
http = new HttpClient(cfg);
this.cfg = cfg;
}
public Response serverInfo() {
Bytes ba = http.put(out -> ProtoUtils.write(out, new InfoRequest()));
return doIO(ba, in -> readResponse(in, Action.INFO));
Bytes ba = http.put(out -> Proto.writeRequest(new InfoRequest(), out));
return doIO(ba, in -> Proto.readResponse(RequestType.INFO, in));
}
public Response command(String command, String requestId) {
Bytes ba = http.put(out -> ProtoUtils.write(out, new CommandRequest(command)));
return doIO(ba, in -> readResponse(in, Action.COMMAND));
Bytes ba = http.put(out -> Proto.writeRequest(new CommandRequest(command), out));
return doIO(ba, in -> Proto.readResponse(RequestType.COMMAND, in));
}
private static <T> T doIO(Bytes ba, DataInputFunction<T> action) {
@ -47,23 +44,6 @@ public class CliHttpClient implements AutoCloseable {
}
}
@SuppressWarnings("unchecked")
private static <R extends Response> R readResponse(DataInput in, Action expected) throws IOException {
String errorMessage = ProtoUtils.readHeader(in);
if (errorMessage != null) {
throw new CliException(errorMessage);
}
int header = in.readInt();
Action action = Action.from(header);
if (expected != action) {
throw new CliException("Expected response for %s, found %s", expected, action);
}
return (R) ProtoUtils.readResponse(in, header);
}
public void close() {}
@FunctionalInterface

View File

@ -9,7 +9,7 @@ import com.sun.net.httpserver.HttpExchange;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.sql.TestUtils;
import org.elasticsearch.xpack.sql.cli.net.protocol.ProtoUtils;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.cli.net.protocol.Request;
import org.elasticsearch.xpack.sql.cli.net.protocol.Response;
import org.elasticsearch.xpack.sql.server.cli.CliServer;
@ -26,13 +26,13 @@ class CliProtoHandler extends ProtoHandler<Response> {
private final CliServer server;
CliProtoHandler(Client client) {
super(client, ProtoUtils::readHeader, CliServerProtoUtils::write);
super(client, in -> null, CliServerProtoUtils::write);
this.server = new CliServer(TestUtils.planExecutor(client), clusterName, () -> info.getNode().getName(), info.getVersion(), info.getBuild());
}
@Override
protected void handle(HttpExchange http, DataInput in) throws IOException {
Request req = ProtoUtils.readRequest(in);
Request req = Proto.readRequest(in);
server.handle(req, wrap(resp -> sendHttpResponse(http, resp), ex -> fail(http, ex)));
}
}

View File

@ -8,7 +8,7 @@ package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.jline.terminal.Terminal;
import org.jline.utils.AttributedStringBuilder;
@ -23,7 +23,7 @@ public class ResponseToStringTests extends ESTestCase {
}
public void testExceptionResponse() {
AttributedStringBuilder s = ResponseToString.toAnsi(new ExceptionResponse(Action.INFO, "test message", "test cause"));
AttributedStringBuilder s = ResponseToString.toAnsi(new ExceptionResponse(RequestType.INFO, "test message", "test cause"));
assertEquals("test message", unstyled(s));
assertEquals("[1;36mtest message[0m", fullyStyled(s));
}

View File

@ -5,10 +5,8 @@
*/
package org.elasticsearch.xpack.sql.server.cli;
import java.io.DataInputStream;
import java.io.IOException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
@ -16,13 +14,14 @@ import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.ProtoUtils;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.util.StringUtils;
import static org.elasticsearch.action.ActionListener.wrap;
import java.io.DataInputStream;
import java.io.IOException;
import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.elasticsearch.rest.RestStatus.OK;
@ -36,29 +35,16 @@ public class CliHttpHandler extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!request.hasContent()) {
return badProto(StringUtils.EMPTY);
throw new IllegalArgumentException("expected a request body");
}
try (DataInputStream in = new DataInputStream(request.content().streamInput())) {
String msg = ProtoUtils.readHeader(in);
if (msg != null) {
return badProto(msg);
}
try {
return c -> client.executeLocally(CliAction.INSTANCE, new CliRequest(ProtoUtils.readRequest(in)),
wrap(response -> cliResponse(c, response), ex -> error(c, ex)));
} catch (Exception ex) {
return badProto("Unknown message");
}
CliRequest cliRequest = new CliRequest(Proto.readRequest(in));
return c -> client.executeLocally(CliAction.INSTANCE, cliRequest,
ActionListener.wrap(response -> cliResponse(c, response), ex -> error(c, ex)));
}
}
private static RestChannelConsumer badProto(String message) {
return c -> c.sendResponse(new BytesRestResponse(BAD_REQUEST, TEXT_CONTENT_TYPE, message));
}
private static void cliResponse(RestChannel channel, CliResponse response) {
BytesRestResponse restResponse = null;
@ -71,8 +57,7 @@ public class CliHttpHandler extends BaseRestHandler {
channel.sendResponse(restResponse);
}
private void error(RestChannel channel, Exception ex) {
logger.debug("failed to parse sql request", ex);
private static void error(RestChannel channel, Exception ex) {
BytesRestResponse response = null;
try {
response = new BytesRestResponse(channel, ex);

View File

@ -47,10 +47,10 @@ public class CliServer {
command((CommandRequest) req, listener);
}
else {
listener.onResponse(new ExceptionResponse(req.action, "Invalid requested", null));
listener.onResponse(new ExceptionResponse(req.requestType(), "Invalid requested", null));
}
} catch (Exception ex) {
listener.onResponse(CliServerProtoUtils.exception(ex, req.action));
listener.onResponse(CliServerProtoUtils.exception(ex, req.requestType()));
}
}
@ -72,7 +72,7 @@ public class CliServer {
listener.onResponse(new CommandResponse(start, stop, requestId, CliUtils.toString(c)));
},
ex -> listener.onResponse(CliServerProtoUtils.exception(ex, req.action))));
ex -> listener.onResponse(CliServerProtoUtils.exception(ex, req.requestType()))));
}
public void queryPage(QueryPageRequest req, ActionListener<Response> listener) {

View File

@ -12,8 +12,8 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.xpack.sql.SqlException;
import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.ProtoUtils;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.Response;
import java.io.DataOutputStream;
@ -28,13 +28,13 @@ public abstract class CliServerProtoUtils {
public static BytesReference write(Response response) throws IOException {
try (BytesStreamOutput array = new BytesStreamOutput();
DataOutputStream out = new DataOutputStream(array)) {
ProtoUtils.write(out, response);
Proto.writeResponse(response, Proto.CURRENT_VERSION, out);
out.flush();
return array.bytes();
}
}
public static Response exception(Throwable cause, Action action) {
public static Response exception(Throwable cause, RequestType requestType) {
String message = EMPTY;
String cs = EMPTY;
if (cause != null) {
@ -45,13 +45,12 @@ public abstract class CliServerProtoUtils {
}
if (expectedException(cause)) {
return new ExceptionResponse(action, message, cs);
return new ExceptionResponse(requestType, message, cs);
}
else {
// TODO: might want to 'massage' this
StringWriter sw = new StringWriter();
cause.printStackTrace(new PrintWriter(sw));
return new ErrorResponse(action, message, cs, sw.toString());
return new ErrorResponse(requestType, message, cs, sw.toString());
}
}