SQL: Throw exceptions on errors (elastic/x-pack-elasticsearch#3066)

Instead of returning "error response" objects and then translating them
into SQL exceptions this just throws the SQL exceptions directly. This
means the CLI catches exceptions and prints out the messages which isn't
ideal if this were hot code but it isn't and this is a much simpler way
of doing things.

Original commit: elastic/x-pack-elasticsearch@08431d3941
This commit is contained in:
Nik Everett 2017-11-22 11:22:31 -05:00 committed by GitHub
parent a4915a5714
commit f97f56ba54
24 changed files with 181 additions and 627 deletions

View File

@ -52,8 +52,8 @@ import static org.elasticsearch.license.XPackLicenseStateTests.randomTrialBasicS
import static org.elasticsearch.license.XPackLicenseStateTests.randomTrialOrPlatinumMode;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.IsEqual.equalTo;
public class SqlLicenseIT extends AbstractLicensesIntegrationTestCase {
@Override
@ -163,6 +163,8 @@ public class SqlLicenseIT extends AbstractLicensesIntegrationTestCase {
Request request = new MetaTableRequest("test");
ResponseException responseException = expectThrows(ResponseException.class, () -> jdbc(request));
assertThat(responseException.getMessage(), containsString("current license is non-compliant for [jdbc]"));
assertThat(responseException.getMessage(), containsString("security_exception"));
enableJdbcLicensing();
Response response = jdbc(request);

View File

@ -1,32 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractErrorResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInput;
import java.io.IOException;
/**
* Response sent when there is a server side error.
*/
public class ErrorResponse extends AbstractErrorResponse {
public ErrorResponse(RequestType requestType, String message, String cause, String stack) {
super(requestType, message, cause, stack);
}
ErrorResponse(Request request, DataInput in) throws IOException {
super((RequestType) request.requestType(), in);
}
@Override
public ResponseType responseType() {
return ResponseType.ERROR;
}
}

View File

@ -1,33 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractExceptionResponse;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInput;
import java.io.IOException;
/**
* Response sent when there is a client side error.
*/
public class ExceptionResponse extends AbstractExceptionResponse {
public ExceptionResponse(RequestType requestType, String message, String cause, SqlExceptionType exceptionType) {
super(requestType, message, cause, exceptionType);
}
ExceptionResponse(Request request, DataInput in) throws IOException {
super((RequestType) request.requestType(), in);
}
@Override
public ResponseType responseType() {
return ResponseType.EXCEPTION;
}
}

View File

@ -13,7 +13,7 @@ import java.io.IOException;
/**
* Binary protocol for the CLI. All backwards compatibility is done using the
* version number sent in the header.
* version number sent in the header.
*/
public final class Proto extends AbstractProto {
public static final Proto INSTANCE = new Proto();
@ -62,8 +62,6 @@ public final class Proto extends AbstractProto {
}
public enum ResponseType implements AbstractProto.ResponseType {
EXCEPTION(ExceptionResponse::new),
ERROR(ErrorResponse::new),
INFO(InfoResponse::new),
QUERY_INIT(QueryInitResponse::new),
QUERY_PAGE(QueryPageResponse::new);
@ -93,4 +91,4 @@ public final class Proto extends AbstractProto {
return reader;
}
}
}
}

View File

@ -1,30 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequestTests.randomQueryInitRequest;
public class ErrorResponseTests extends ESTestCase {
static ErrorResponse randomErrorResponse() {
return new ErrorResponse(RequestType.QUERY_INIT, randomAlphaOfLength(5), randomAlphaOfLength(5), randomAlphaOfLength(5));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomQueryInitRequest(), randomErrorResponse());
}
public void testToString() {
assertEquals("ErrorResponse<request=[QUERY_INIT] message=[test] cause=[test] stack=[stack\nstack]>",
new ErrorResponse(RequestType.QUERY_INIT, "test", "test", "stack\nstack").toString());
}
}

View File

@ -1,32 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequestTests.randomQueryInitRequest;
public class ExceptionResponseTests extends ESTestCase {
static ExceptionResponse randomExceptionResponse() {
return new ExceptionResponse(RequestType.QUERY_INIT, randomAlphaOfLength(5), randomAlphaOfLength(5),
randomFrom(SqlExceptionType.values()));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomQueryInitRequest(), randomExceptionResponse());
}
public void testToString() {
assertEquals("ExceptionResponse<request=[QUERY_INIT] message=[test] cause=[test] type=[SYNTAX]>",
new ExceptionResponse(RequestType.QUERY_INIT, "test", "test", SqlExceptionType.SYNTAX).toString());
}
}

View File

@ -5,12 +5,11 @@
*/
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryResponse;
import org.elasticsearch.xpack.sql.client.shared.SuppressForbidden;
import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection;
import org.elasticsearch.xpack.sql.client.shared.StringUtils;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryInitRequest;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.jline.reader.EndOfFileException;
import org.jline.reader.LineReader;
import org.jline.reader.LineReaderBuilder;
@ -29,6 +28,7 @@ import java.io.PrintWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.Locale;
import java.util.Properties;
import java.util.logging.LogManager;
@ -284,7 +284,11 @@ public class Cli {
}
private void executeServerInfo() {
term.writer().println(ResponseToString.toAnsi(cliClient.serverInfo()).toAnsi(term));
try {
term.writer().println(ResponseToString.toAnsi(cliClient.serverInfo()).toAnsi(term));
} catch (SQLException e) {
error("Error fetching server info", e.getMessage());
}
}
private static boolean isExit(String line) {
@ -293,24 +297,47 @@ public class Cli {
}
private void executeQuery(String line) throws IOException {
Response response = cliClient.queryInit(line, fetchSize);
QueryResponse response;
try {
response = cliClient.queryInit(line, fetchSize);
} catch (SQLException e) {
if (JreHttpUrlConnection.SQL_STATE_BAD_SERVER.equals(e.getSQLState())) {
error("Server error", e.getMessage());
} else {
error("Bad request", e.getMessage());
}
return;
}
while (true) {
term.writer().print(ResponseToString.toAnsi(response).toAnsi(term));
term.writer().flush();
if (response.responseType() == ResponseType.ERROR || response.responseType() == ResponseType.EXCEPTION) {
return;
}
QueryResponse queryResponse = (QueryResponse) response;
if (queryResponse.cursor().length == 0) {
if (response.cursor().length == 0) {
// Successfully finished the entire query!
return;
}
if (false == fetchSeparator.equals("")) {
term.writer().println(fetchSeparator);
}
response = cliClient.nextPage(queryResponse.cursor());
try {
response = cliClient.nextPage(response.cursor());
} catch (SQLException e) {
if (JreHttpUrlConnection.SQL_STATE_BAD_SERVER.equals(e.getSQLState())) {
error("Server error", e.getMessage());
} else {
error("Bad request", e.getMessage());
} return;
}
}
}
private void error(String type, String message) {
AttributedStringBuilder sb = new AttributedStringBuilder();
sb.append(type + " [", BOLD.foreground(RED));
sb.append(message, DEFAULT.boldOff().italic().foreground(YELLOW));
sb.append("]", BOLD.underlineOff().foreground(RED));
term.writer().print(sb.toAnsi(term));
}
static class FatalException extends RuntimeException {
FatalException(String message, Throwable cause) {
super(message, cause);

View File

@ -5,22 +5,21 @@
*/
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryResponse;
import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection;
import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection.ResponseOrException;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.SQLException;
import java.time.Instant;
import java.util.TimeZone;
@ -31,21 +30,20 @@ public class CliHttpClient {
this.cfg = cfg;
}
public InfoResponse serverInfo() {
public InfoResponse serverInfo() throws SQLException {
InfoRequest request = new InfoRequest();
// TODO think about error handling here
return (InfoResponse) sendRequest(request);
}
public Response queryInit(String query, int fetchSize) {
public QueryResponse queryInit(String query, int fetchSize) throws SQLException {
// TODO allow customizing the time zone - this is what session set/reset/get should be about
QueryInitRequest request = new QueryInitRequest(query, fetchSize, TimeZone.getTimeZone("UTC"), timeout());
return sendRequest(request);
return (QueryResponse) sendRequest(request);
}
public Response nextPage(byte[] cursor) {
public QueryResponse nextPage(byte[] cursor) throws SQLException {
QueryPageRequest request = new QueryPageRequest(cursor, timeout());
return sendRequest(request);
return (QueryResponse) sendRequest(request);
}
private TimeoutInfo timeout() {
@ -53,28 +51,14 @@ public class CliHttpClient {
return new TimeoutInfo(clientTime, cfg.queryTimeout(), cfg.pageTimeout());
}
private Response sendRequest(Request request) {
return AccessController.doPrivileged((PrivilegedAction<Response>) () ->
private Response sendRequest(Request request) throws SQLException {
return AccessController.doPrivileged((PrivilegedAction<ResponseOrException<Response>>) () ->
JreHttpUrlConnection.http(cfg.asUrl(), cfg, con ->
con.post(
out -> Proto.INSTANCE.writeRequest(request, out),
in -> Proto.INSTANCE.readResponse(request, in),
(status, failure) -> {
if (status >= 500) {
return new ErrorResponse((RequestType) request.requestType(), failure.reason(),
failure.type(), failure.remoteTrace());
}
SqlExceptionType type = SqlExceptionType.fromRemoteFailureType(failure.type());
if (type == null) {
return new ErrorResponse((RequestType) request.requestType(),
"Sent bad type [" + failure.type() + "]. Original message is [" + failure.reason() + "]",
failure.type(), failure.remoteTrace());
}
return new ExceptionResponse((RequestType) request.requestType(), failure.reason(),
failure.type(), type);
}
in -> Proto.INSTANCE.readResponse(request, in)
)
)
);
).getResponseOrThrowException();
}
}

View File

@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryResponse;
@ -19,12 +17,9 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import static org.jline.utils.AttributedStyle.BOLD;
import static org.jline.utils.AttributedStyle.BRIGHT;
import static org.jline.utils.AttributedStyle.DEFAULT;
import static org.jline.utils.AttributedStyle.RED;
import static org.jline.utils.AttributedStyle.WHITE;
import static org.jline.utils.AttributedStyle.YELLOW;
abstract class ResponseToString {
@ -54,25 +49,11 @@ abstract class ResponseToString {
sb.append(" Version:", DEFAULT.foreground(BRIGHT));
sb.append(info.versionString, DEFAULT.foreground(WHITE));
return sb;
case ERROR:
ErrorResponse err = (ErrorResponse) response;
error("Server error", err.message, sb);
return sb;
case EXCEPTION:
ExceptionResponse ex = (ExceptionResponse) response;
error("Bad request", ex.message, sb);
return sb;
default:
throw new IllegalArgumentException("Unsupported response: " + response);
}
}
private static void error(String type, String message, AttributedStringBuilder sb) {
sb.append(type + " [", BOLD.foreground(RED));
sb.append(message, DEFAULT.boldOff().italic().foreground(YELLOW));
sb.append("]", BOLD.underlineOff().foreground(RED));
}
private static String handleGraphviz(String str) {
try {
// save the content to a temp file

View File

@ -6,11 +6,8 @@
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageResponse;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.jline.terminal.Terminal;
import org.jline.utils.AttributedStringBuilder;
@ -30,13 +27,6 @@ public class ResponseToStringTests extends ESTestCase {
assertEquals("[37msome command response[0m", fullyStyled(s));
}
public void testExceptionResponse() {
AttributedStringBuilder s = ResponseToString.toAnsi(new ExceptionResponse(RequestType.INFO, "test message", "test cause",
randomFrom(SqlExceptionType.values())));
assertEquals("Bad request [test message]", unstyled(s));
assertEquals("[1;31mBad request [[22;3;33mtest message[1;23;31m][0m", fullyStyled(s));
}
private String unstyled(AttributedStringBuilder s) {
Terminal dumb = mock(Terminal.class);
when(dumb.getType()).thenReturn(Terminal.TYPE_DUMB);

View File

@ -1,32 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractErrorResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInput;
import java.io.IOException;
/**
* Response sent when there is a server side error.
*/
public class ErrorResponse extends AbstractErrorResponse {
public ErrorResponse(RequestType requestType, String message, String cause, String stack) {
super(requestType, message, cause, stack);
}
ErrorResponse(Request request, DataInput in) throws IOException {
super((RequestType) request.requestType(), in);
}
@Override
public ResponseType responseType() {
return ResponseType.ERROR;
}
}

View File

@ -1,33 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractExceptionResponse;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInput;
import java.io.IOException;
/**
* Response sent when there is a client side error.
*/
public class ExceptionResponse extends AbstractExceptionResponse {
public ExceptionResponse(RequestType requestType, String message, String cause, SqlExceptionType exceptionType) {
super(requestType, message, cause, exceptionType);
}
ExceptionResponse(Request request, DataInput in) throws IOException {
super((RequestType) request.requestType(), in);
}
@Override
public ResponseType responseType() {
return ResponseType.EXCEPTION;
}
}

View File

@ -13,7 +13,7 @@ import java.io.IOException;
/**
* Binary protocol for the JDBC. All backwards compatibility is done using the
* version number sent in the header.
* version number sent in the header.
*/
public final class Proto extends AbstractProto {
public static final Proto INSTANCE = new Proto();
@ -66,8 +66,6 @@ public final class Proto extends AbstractProto {
}
public enum ResponseType implements AbstractProto.ResponseType {
EXCEPTION(ExceptionResponse::new),
ERROR(ErrorResponse::new),
INFO(InfoResponse::new),
META_TABLE(MetaTableResponse::new),
META_COLUMN(MetaColumnResponse::new),
@ -101,4 +99,4 @@ public final class Proto extends AbstractProto {
return reader;
}
}
}
}

View File

@ -1,28 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.assertRoundTripCurrentVersion;
public class ErrorResponseTests extends ESTestCase {
static ErrorResponse randomErrorResponse() {
return new ErrorResponse(RequestType.META_TABLE, randomAlphaOfLength(5), randomAlphaOfLength(5), randomAlphaOfLength(5));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(MetaTableRequestTests::randomMetaTableRequest, randomErrorResponse());
}
public void testToString() {
assertEquals("ErrorResponse<request=[INFO] message=[test] cause=[test] stack=[stack\nstack]>",
new ErrorResponse(RequestType.INFO, "test", "test", "stack\nstack").toString());
}
}

View File

@ -1,30 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.assertRoundTripCurrentVersion;
public class ExceptionResponseTests extends ESTestCase {
static ExceptionResponse randomExceptionResponse() {
return new ExceptionResponse(RequestType.META_TABLE, randomAlphaOfLength(5), randomAlphaOfLength(5),
randomFrom(SqlExceptionType.values()));
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(MetaTableRequestTests::randomMetaTableRequest, randomExceptionResponse());
}
public void testToString() {
assertEquals("ExceptionResponse<request=[INFO] message=[test] cause=[test] type=[SYNTAX]>",
new ExceptionResponse(RequestType.INFO, "test", "test", SqlExceptionType.SYNTAX).toString());
}
}

View File

@ -7,14 +7,11 @@ package org.elasticsearch.xpack.sql.jdbc.net.client;
import org.elasticsearch.xpack.sql.client.shared.ClientException;
import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection;
import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection.ResponseOrException;
import org.elasticsearch.xpack.sql.jdbc.JdbcException;
import org.elasticsearch.xpack.sql.jdbc.JdbcSQLException;
import org.elasticsearch.xpack.sql.jdbc.jdbc.JdbcConfiguration;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
@ -67,28 +64,14 @@ class HttpClient {
Response put(Request request) throws SQLException {
try {
return AccessController.doPrivileged((PrivilegedAction<Response>) () ->
return AccessController.doPrivileged((PrivilegedAction<ResponseOrException<Response>>) () ->
JreHttpUrlConnection.http(url, cfg, con ->
con.post(
out -> Proto.INSTANCE.writeRequest(request, out),
in -> Proto.INSTANCE.readResponse(request, in),
(status, failure) -> {
if (status >= 500) {
return new ErrorResponse((RequestType) request.requestType(), failure.reason(),
failure.type(), failure.remoteTrace());
}
SqlExceptionType type = SqlExceptionType.fromRemoteFailureType(failure.type());
if (type == null) {
return new ErrorResponse((RequestType) request.requestType(),
"Sent bad type [" + failure.type() + "]. Original message is [" + failure.reason() + "]",
failure.type(), failure.remoteTrace());
}
return new ExceptionResponse((RequestType) request.requestType(), failure.reason(),
failure.type(), type);
}
in -> Proto.INSTANCE.readResponse(request, in)
)
)
);
).getResponseOrThrowException();
} catch (ClientException ex) {
throw new JdbcSQLException(ex, "Transport failure");
}

View File

@ -5,10 +5,7 @@
*/
package org.elasticsearch.xpack.sql.jdbc.net.client;
import org.elasticsearch.xpack.sql.jdbc.JdbcSQLException;
import org.elasticsearch.xpack.sql.jdbc.jdbc.JdbcConfiguration;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnInfo;
@ -17,18 +14,13 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Page;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.time.Instant;
@ -64,7 +56,7 @@ public class JdbcHttpClient {
public Cursor query(String sql, RequestMeta meta) throws SQLException {
int fetch = meta.fetchSize() > 0 ? meta.fetchSize() : conCfg.pageSize();
QueryInitRequest request = new QueryInitRequest(sql, fetch, conCfg.timeZone(), timeout(meta));
QueryInitResponse response = (QueryInitResponse) checkResponse(http.put(request));
QueryInitResponse response = (QueryInitResponse) http.put(request);
return new DefaultCursor(this, response.cursor(), (Page) response.data, meta);
}
@ -74,7 +66,7 @@ public class JdbcHttpClient {
*/
public byte[] nextPage(byte[] cursor, Page page, RequestMeta meta) throws SQLException {
QueryPageRequest request = new QueryPageRequest(cursor, timeout(meta), page);
return ((QueryPageResponse) checkResponse(http.put(request))).cursor();
return ((QueryPageResponse) http.put(request)).cursor();
}
public InfoResponse serverInfo() throws SQLException {
@ -86,17 +78,17 @@ public class JdbcHttpClient {
private InfoResponse fetchServerInfo() throws SQLException {
InfoRequest request = new InfoRequest();
return (InfoResponse) checkResponse(http.put(request));
return (InfoResponse) http.put(request);
}
public List<String> metaInfoTables(String pattern) throws SQLException {
MetaTableRequest request = new MetaTableRequest(pattern);
return ((MetaTableResponse) checkResponse(http.put(request))).tables;
return ((MetaTableResponse) http.put(request)).tables;
}
public List<MetaColumnInfo> metaInfoColumns(String tablePattern, String columnPattern) throws SQLException {
MetaColumnRequest request = new MetaColumnRequest(tablePattern, columnPattern);
return ((MetaColumnResponse) checkResponse(http.put(request))).columns;
return ((MetaColumnResponse) http.put(request)).columns;
}
public void setNetworkTimeout(long millis) {
@ -107,19 +99,6 @@ public class JdbcHttpClient {
return http.getNetworkTimeout();
}
private static Response checkResponse(Response response) throws SQLException {
if (response.responseType() == ResponseType.EXCEPTION) {
ExceptionResponse ex = (ExceptionResponse) response;
throw ex.asException();
}
if (response.responseType() == ResponseType.ERROR) {
ErrorResponse error = (ErrorResponse) response;
// TODO: this could be made configurable to switch between message to error
throw new JdbcSQLException("Server returned error: [" + error.message + "][" + error.stack + "]");
}
return response;
}
private TimeoutInfo timeout(RequestMeta meta) {
// client time
long clientTime = Instant.now().toEpochMilli();

View File

@ -12,14 +12,11 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
@ -29,7 +26,6 @@ import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageResponse;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.joda.time.DateTimeZone;

View File

@ -21,8 +21,6 @@ import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ColumnInfo;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnInfo;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnRequest;
@ -38,7 +36,6 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageResponse;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.type.DataType;

View File

@ -18,15 +18,32 @@ import java.net.Proxy;
import java.net.URL;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.SQLClientInfoException;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLInvalidAuthorizationSpecException;
import java.sql.SQLRecoverableException;
import java.sql.SQLSyntaxErrorException;
import java.sql.SQLTimeoutException;
import java.util.Base64;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.zip.GZIPInputStream;
import javax.sql.rowset.serial.SerialException;
import static java.util.Collections.emptyMap;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
public class JreHttpUrlConnection implements Closeable {
/**
* State added to {@link SQLException}s when the server encounters an
* error.
*/
public static final String SQL_STATE_BAD_SERVER = "bad_server";
public static <R> R http(URL url, ConnectionConfiguration cfg, Function<JreHttpUrlConnection, R> handler) {
try (JreHttpUrlConnection con = new JreHttpUrlConnection(url, cfg)) {
return handler.apply(con);
@ -102,10 +119,9 @@ public class JreHttpUrlConnection implements Closeable {
}
}
public <R> R post(
public <R> ResponseOrException<R> post(
CheckedConsumer<DataOutput, IOException> doc,
CheckedFunction<DataInput, R, IOException> parser,
BiFunction<Integer, RemoteFailure, R> failureConverter
CheckedFunction<DataInput, R, IOException> parser
) throws ClientException {
try {
con.setRequestMethod("POST");
@ -116,19 +132,58 @@ public class JreHttpUrlConnection implements Closeable {
}
if (con.getResponseCode() < 300) {
try (InputStream stream = getStream(con, con.getInputStream())) {
return parser.apply(new DataInputStream(stream));
return new ResponseOrException<>(parser.apply(new DataInputStream(stream)));
}
}
RemoteFailure failure;
try (InputStream stream = getStream(con, con.getErrorStream())) {
failure = RemoteFailure.parseFromResponse(stream);
}
return failureConverter.apply(con.getResponseCode(), failure);
if (con.getResponseCode() >= 500) {
/*
* Borrowing a page from the HTTP spec, we throw a "transient"
* exception if the server responded with a 500, not because
* we think that the application should retry, but because we
* think that the failure is not the fault of the application.
*/
return new ResponseOrException<>(new SQLException("Server encountered an error ["
+ failure.reason() + "]. [" + failure.remoteTrace() + "]", SQL_STATE_BAD_SERVER));
}
SqlExceptionType type = SqlExceptionType.fromRemoteFailureType(failure.type());
if (type == null) {
return new ResponseOrException<>(new SQLException("Server sent bad type ["
+ failure.type() + "]. Original type was [" + failure.reason() + "]. ["
+ failure.remoteTrace() + "]", SQL_STATE_BAD_SERVER));
}
return new ResponseOrException<>(type.asException(failure.reason()));
} catch (IOException ex) {
throw new ClientException(ex, "Cannot POST address %s (%s)", url, ex.getMessage());
}
}
public static class ResponseOrException<R> {
private final R response;
private final SQLException exception;
private ResponseOrException(R response) {
this.response = response;
this.exception = null;
}
private ResponseOrException(SQLException exception) {
this.response = null;
this.exception = exception;
}
public R getResponseOrThrowException() throws SQLException {
if (exception != null) {
throw exception;
}
assert response != null;
return response;
}
}
private static InputStream getStream(HttpURLConnection con, InputStream stream) throws IOException {
if (GZIP.equals(con.getContentEncoding())) {
return new GZIPInputStream(stream);
@ -181,4 +236,52 @@ public class JreHttpUrlConnection implements Closeable {
}
}
}
/**
* Exception type.
*/
public enum SqlExceptionType {
UNKNOWN(SQLException::new),
SERIAL(SerialException::new),
CLIENT_INFO(message -> new SQLClientInfoException(message, emptyMap())),
DATA(SQLDataException::new),
SYNTAX(SQLSyntaxErrorException::new),
RECOVERABLE(SQLRecoverableException::new),
TIMEOUT(SQLTimeoutException::new),
SECURITY(SQLInvalidAuthorizationSpecException::new),
NOT_SUPPORTED(SQLFeatureNotSupportedException::new);
public static SqlExceptionType fromRemoteFailureType(String type) {
switch (type) {
case "analysis_exception":
case "resource_not_found_exception":
case "verification_exception":
return DATA;
case "planning_exception":
case "mapping_exception":
return NOT_SUPPORTED;
case "parsing_exception":
return SYNTAX;
case "security_exception":
return SECURITY;
case "timeout_exception":
return TIMEOUT;
default:
return null;
}
}
private final Function<String, SQLException> toException;
SqlExceptionType(Function<String, SQLException> toException) {
this.toException = toException;
}
SQLException asException(String message) {
if (message == null) {
throw new IllegalArgumentException("[message] cannot be null");
}
return toException.apply(message);
}
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.sql.client.shared;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection.ResponseOrException;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@ -70,7 +71,7 @@ public class SSLTests extends ESTestCase {
PathUtils.get(getClass().getResource("/ssl/client.keystore").toURI()).toRealPath().toString());
prop.setProperty("ssl.truststore.pass", "password");
//prop.setProperty("ssl.accept.self.signed.certs", "true");
cfg = new ConnectionConfiguration(prop);
}
@ -106,16 +107,15 @@ public class SSLTests extends ESTestCase {
public void testSslPost() throws Exception {
String message = UUID.randomUUID().toString();
String received = AccessController.doPrivileged((PrivilegedAction<String>) () ->
String received = AccessController.doPrivileged((PrivilegedAction<ResponseOrException<String>>) () ->
JreHttpUrlConnection.http(sslServer, cfg, c ->
c.post(
out -> out.writeUTF(message),
DataInput::readUTF,
(status, failure) -> "failure: [" + status + "][" + failure + "]"
DataInput::readUTF
)
)
);
).getResponseOrThrowException();
assertEquals(message, received);
}
}
}

View File

@ -1,71 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.protocol.shared;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.RequestType;
import java.io.DataInput;
import java.io.IOException;
import java.util.Objects;
/**
* Response sent when there is a server side error.
*/
public abstract class AbstractErrorResponse extends Response {
private final RequestType requestType;
public final String message, cause, stack;
protected AbstractErrorResponse(RequestType requestType, String message, String cause, String stack) {
this.requestType = requestType;
this.message = message;
this.cause = cause;
this.stack = stack;
}
protected AbstractErrorResponse(RequestType requestType, DataInput in) throws IOException {
this.requestType = requestType;
message = in.readUTF();
cause = in.readUTF();
stack = in.readUTF();
}
@Override
protected final void writeTo(SqlDataOutput out) throws IOException {
out.writeUTF(message);
out.writeUTF(cause);
out.writeUTF(stack);
}
@Override
public RequestType requestType() {
return requestType;
}
@Override
protected final String toStringBody() {
return "request=[" + requestType
+ "] message=[" + message
+ "] cause=[" + cause
+ "] stack=[" + stack + "]";
}
@Override
public final boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
AbstractErrorResponse other = (AbstractErrorResponse) obj;
return Objects.equals(requestType, other.requestType)
&& Objects.equals(message, other.message)
&& Objects.equals(cause, other.cause)
&& Objects.equals(stack, other.stack);
}
@Override
public final int hashCode() {
return Objects.hash(message, cause, stack);
}
}

View File

@ -1,93 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.protocol.shared;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import java.io.DataInput;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Objects;
/**
* Response sent when there is a client side error.
*/
public abstract class AbstractExceptionResponse extends Response {
private final RequestType requestType;
public final String message, cause;
private SqlExceptionType exceptionType;
protected AbstractExceptionResponse(RequestType requestType, String message, String cause, SqlExceptionType exceptionType) {
if (requestType == null) {
throw new IllegalArgumentException("[requestType] cannot be null");
}
if (message == null) {
throw new IllegalArgumentException("[message] cannot be null");
}
if (cause == null) {
throw new IllegalArgumentException("[cause] cannot be null");
}
if (exceptionType == null) {
throw new IllegalArgumentException("[exceptionType] cannot be null");
}
this.requestType = requestType;
this.message = message;
this.cause = cause;
this.exceptionType = exceptionType;
}
protected AbstractExceptionResponse(RequestType requestType, DataInput in) throws IOException {
this.requestType = requestType;
message = in.readUTF();
cause = in.readUTF();
exceptionType = SqlExceptionType.read(in);
}
@Override
protected final void writeTo(SqlDataOutput out) throws IOException {
out.writeUTF(message);
out.writeUTF(cause);
exceptionType.writeTo(out);
}
@Override
public RequestType requestType() {
return requestType;
}
@Override
protected final String toStringBody() {
return "request=[" + requestType
+ "] message=[" + message
+ "] cause=[" + cause
+ "] type=[" + exceptionType + "]";
}
@Override
public final boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
AbstractExceptionResponse other = (AbstractExceptionResponse) obj;
return Objects.equals(requestType, other.requestType)
&& Objects.equals(message, other.message)
&& Objects.equals(cause, other.cause)
&& Objects.equals(exceptionType, other.exceptionType);
}
@Override
public final int hashCode() {
return Objects.hash(requestType, message, cause, exceptionType);
}
/**
* Build an exception to throw for this failure.
*/
public SQLException asException() {
return exceptionType.asException(message);
}
}

View File

@ -8,18 +8,6 @@ package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.SQLClientInfoException;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLRecoverableException;
import java.sql.SQLSyntaxErrorException;
import java.sql.SQLTimeoutException;
import java.util.function.Function;
import javax.sql.rowset.serial.SerialException;
import static java.util.Collections.emptyMap;
/**
* Base implementation for the binary protocol for the CLI and JDBC.
@ -74,64 +62,6 @@ public abstract class AbstractProto {
return response;
}
/**
* Exception type.
*/
public enum SqlExceptionType {
UNKNOWN(SQLException::new),
SERIAL(SerialException::new),
CLIENT_INFO(message -> new SQLClientInfoException(message, emptyMap())),
DATA(SQLDataException::new),
SYNTAX(SQLSyntaxErrorException::new),
RECOVERABLE(SQLRecoverableException::new),
TIMEOUT(SQLTimeoutException::new),
NOT_SUPPORTED(SQLFeatureNotSupportedException::new);
public static SqlExceptionType fromRemoteFailureType(String type) {
switch (type) {
case "analysis_exception":
case "resource_not_found_exception":
case "verification_exception":
return DATA;
case "planning_exception":
case "mapping_exception":
return NOT_SUPPORTED;
case "parsing_exception":
return SYNTAX;
case "timeout_exception":
return TIMEOUT;
default:
return null;
}
}
private final Function<String, SQLException> toException;
SqlExceptionType(Function<String, SQLException> toException) {
this.toException = toException;
}
public static SqlExceptionType read(DataInput in) throws IOException {
byte b = in.readByte();
try {
return values()[b];
} catch (ArrayIndexOutOfBoundsException e) {
throw new IllegalArgumentException("Unknown request type [" + b + "]", e);
}
}
public void writeTo(DataOutput out) throws IOException {
out.writeByte(ordinal());
}
SQLException asException(String message) {
if (message == null) {
throw new IllegalArgumentException("[message] cannot be null");
}
return toException.apply(message);
}
}
protected abstract RequestType readRequestType(DataInput in) throws IOException;
protected abstract ResponseType readResponseType(DataInput in) throws IOException;
@FunctionalInterface