SQL: binary communication implementation for drivers and the CLI (#48261)

* Introduce binary_format request parameter (binary.format for JDBC) to disable binary
communication between clients (jdbc/odbc) and server.
* for CLI - "binary" command line parameter (or -b) is introduced. Default value is "true".
* binary communication (cbor) is enabled by default
* disabling request parameter introduced for debugging purposes only

(cherry picked from commit f96a5ca61cb9fad9ed59357320af20e669348ce7)
This commit is contained in:
Andrei Stefan 2019-10-31 17:44:34 -04:00 committed by Andrei Stefan
parent 4be54402de
commit 2c73c7dfe3
29 changed files with 931 additions and 139 deletions

View File

@ -25,6 +25,7 @@ dependencies {
transitive = false
}
compile project(':libs:elasticsearch-core')
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
runtime "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
testCompile project(":test:framework")
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')

View File

@ -0,0 +1 @@
8b9826e16c3366764bfb7ad7362554f0471046c3

View File

@ -64,7 +64,8 @@ class JdbcHttpClient {
null,
new RequestInfo(Mode.JDBC),
conCfg.fieldMultiValueLeniency(),
conCfg.indexIncludeFrozen());
conCfg.indexIncludeFrozen(),
conCfg.binaryCommunication());
SqlQueryResponse response = httpClient.query(sqlRequest);
return new DefaultCursor(this, response.cursor(), toJdbcColumnInfo(response.columns()), response.rows(), meta);
}
@ -75,7 +76,7 @@ class JdbcHttpClient {
*/
Tuple<String, List<List<Object>>> nextPage(String cursor, RequestMeta meta) throws SQLException {
SqlQueryRequest sqlRequest = new SqlQueryRequest(cursor, TimeValue.timeValueMillis(meta.timeoutInMs()),
TimeValue.timeValueMillis(meta.queryTimeoutInMs()), new RequestInfo(Mode.JDBC));
TimeValue.timeValueMillis(meta.queryTimeoutInMs()), new RequestInfo(Mode.JDBC), conCfg.binaryCommunication());
SqlQueryResponse response = httpClient.query(sqlRequest);
return new Tuple<>(response.cursor(), response.rows());
}

View File

@ -0,0 +1,260 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.client.ConnectionConfiguration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
public class JdbcHttpClientRequestTests extends ESTestCase {
private static RawRequestMockWebServer webServer = new RawRequestMockWebServer();
private static final Logger logger = LogManager.getLogger(JdbcHttpClientRequestTests.class);
@BeforeClass
public static void init() throws Exception {
webServer.start();
}
@AfterClass
public static void cleanup() {
webServer.close();
}
public void testBinaryRequestEnabled() throws Exception {
assertBinaryRequest(true, XContentType.CBOR);
}
public void testBinaryRequestDisabled() throws Exception {
assertBinaryRequest(false, XContentType.JSON);
}
private void assertBinaryRequest(boolean isBinary, XContentType xContentType) throws Exception {
String url = JdbcConfiguration.URL_PREFIX + webServer.getHostName() + ":" + webServer.getPort();
Properties props = new Properties();
props.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(isBinary));
JdbcHttpClient httpClient = new JdbcHttpClient(JdbcConfiguration.create(url, props, 0), false);
prepareMockResponse();
try {
httpClient.query(randomAlphaOfLength(256), null,
new RequestMeta(randomIntBetween(1, 100), randomNonNegativeLong(), randomNonNegativeLong()));
} catch (SQLException e) {
logger.info("Ignored SQLException", e);
}
assertValues(isBinary, xContentType);
prepareMockResponse();
try {
httpClient.nextPage("", new RequestMeta(randomIntBetween(1, 100), randomNonNegativeLong(), randomNonNegativeLong()));
} catch (SQLException e) {
logger.info("Ignored SQLException", e);
}
assertValues(isBinary, xContentType);
}
private void assertValues(boolean isBinary, XContentType xContentType) {
assertEquals(1, webServer.requests().size());
RawRequest recordedRequest = webServer.takeRequest();
assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type"));
assertEquals("POST", recordedRequest.getMethod());
BytesReference bytesRef = recordedRequest.getBodyAsBytes();
Map<String, Object> reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2();
assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase("jdbc"));
assertEquals(isBinary, reqContent.get("binary_format"));
}
private void prepareMockResponse() {
webServer.enqueue(new Response()
.setResponseCode(200)
.addHeader("Content-Type", "application/json")
.setBody("{\"rows\":[],\"columns\":[]}"));
}
@SuppressForbidden(reason = "use http server")
private static class RawRequestMockWebServer implements Closeable {
private HttpServer server;
private final Queue<Response> responses = ConcurrentCollections.newQueue();
private final Queue<RawRequest> requests = ConcurrentCollections.newQueue();
private String hostname;
private int port;
RawRequestMockWebServer() {
}
void start() throws IOException {
InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0);
server = MockHttpServer.createHttp(address, 0);
server.start();
this.hostname = server.getAddress().getHostString();
this.port = server.getAddress().getPort();
server.createContext("/", s -> {
try {
Response response = responses.poll();
RawRequest request = createRequest(s);
requests.add(request);
s.getResponseHeaders().putAll(response.getHeaders());
if (Strings.isEmpty(response.getBody())) {
s.sendResponseHeaders(response.getStatusCode(), 0);
} else {
byte[] responseAsBytes = response.getBody().getBytes(StandardCharsets.UTF_8);
s.sendResponseHeaders(response.getStatusCode(), responseAsBytes.length);
if ("HEAD".equals(request.getMethod()) == false) {
try (OutputStream responseBody = s.getResponseBody()) {
responseBody.write(responseAsBytes);
}
}
}
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to respond to request [{} {}]",
s.getRequestMethod(), s.getRequestURI()), e);
} finally {
s.close();
}
});
}
private RawRequest createRequest(HttpExchange exchange) throws IOException {
RawRequest request = new RawRequest(exchange.getRequestMethod(), exchange.getRequestHeaders());
if (exchange.getRequestBody() != null) {
BytesReference bytesRef = Streams.readFully(exchange.getRequestBody());
request.setBodyAsBytes(bytesRef);
}
return request;
}
String getHostName() {
return hostname;
}
int getPort() {
return port;
}
void enqueue(Response response) {
responses.add(response);
}
List<RawRequest> requests() {
return new ArrayList<>(requests);
}
RawRequest takeRequest() {
return requests.poll();
}
@Override
public void close() {
if (server.getExecutor() instanceof ExecutorService) {
terminate((ExecutorService) server.getExecutor());
}
server.stop(0);
}
}
@SuppressForbidden(reason = "use http server header class")
private static class RawRequest {
private final String method;
private final Headers headers;
private BytesReference bodyAsBytes = null;
RawRequest(String method, Headers headers) {
this.method = method;
this.headers = headers;
}
public String getMethod() {
return method;
}
public String getHeader(String name) {
return headers.getFirst(name);
}
public BytesReference getBodyAsBytes() {
return bodyAsBytes;
}
public void setBodyAsBytes(BytesReference bodyAsBytes) {
this.bodyAsBytes = bodyAsBytes;
}
}
@SuppressForbidden(reason = "use http server header class")
private class Response {
private String body = null;
private int statusCode = 200;
private Headers headers = new Headers();
public Response setBody(String body) {
this.body = body;
return this;
}
public Response setResponseCode(int statusCode) {
this.statusCode = statusCode;
return this;
}
public Response addHeader(String name, String value) {
headers.add(name, value);
return this;
}
String getBody() {
return body;
}
int getStatusCode() {
return statusCode;
}
Headers getHeaders() {
return headers;
}
}
}

View File

@ -26,9 +26,9 @@ public class VersionParityTests extends WebServerTestCase {
public void testExceptionThrownOnIncompatibleVersions() throws IOException, SQLException {
Version version = VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.CURRENT));
prepareRequest(version);
prepareResponse(version);
String url = JdbcConfiguration.URL_PREFIX + webServer().getHostName() + ":" + webServer().getPort();
String url = JdbcConfiguration.URL_PREFIX + webServerAddress();
SQLException ex = expectThrows(SQLException.class, () -> new JdbcHttpClient(JdbcConfiguration.create(url, null, 0)));
assertEquals("This version of the JDBC driver is only compatible with Elasticsearch version "
+ org.elasticsearch.xpack.sql.client.Version.CURRENT.toString()
@ -36,7 +36,7 @@ public class VersionParityTests extends WebServerTestCase {
}
public void testNoExceptionThrownForCompatibleVersions() throws IOException {
prepareRequest(null);
prepareResponse(null);
String url = JdbcConfiguration.URL_PREFIX + webServerAddress();
try {
@ -46,7 +46,7 @@ public class VersionParityTests extends WebServerTestCase {
}
}
void prepareRequest(Version version) throws IOException {
void prepareResponse(Version version) throws IOException {
MainResponse response = version == null ? createCurrentVersionMainResponse() : createMainResponse(version);
webServer().enqueue(new MockResponse().setResponseCode(200).addHeader("Content-Type", "application/json").setBody(
XContentHelper.toXContent(response, XContentType.JSON, false).utf8ToString()));

View File

@ -11,13 +11,11 @@ import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.elasticsearch.test.rest.ESRestTestCase;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.UnsupportedCharsetException;
import java.sql.JDBCType;
import java.util.HashMap;
@ -27,6 +25,7 @@ import java.util.Map;
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.mode;
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.randomMode;
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.toMap;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.SQL_QUERY_REST_ENDPOINT;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.columnInfo;
@ -101,9 +100,7 @@ public class RestSqlMultinodeIT extends ESRestTestCase {
}
private Map<String, Object> responseToMap(Response response) throws IOException {
try (InputStream content = response.getEntity().getContent()) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
return toMap(response, "plain");
}
private void assertCount(RestClient client, int count) throws IOException {
@ -114,7 +111,7 @@ public class RestSqlMultinodeIT extends ESRestTestCase {
Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
request.setJsonEntity("{\"query\": \"SELECT COUNT(*) FROM test\"" + mode(mode) + "}");
Map<String, Object> actual = responseToMap(client.performRequest(request));
Map<String, Object> actual = toMap(client.performRequest(request), mode);
if (false == expected.equals(actual)) {
NotEqualMessageBuilder message = new NotEqualMessageBuilder();

View File

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.qa.multi_node;
import org.elasticsearch.xpack.sql.qa.SqlProtocolTestCase;
public class SqlProtocolIT extends SqlProtocolTestCase {
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.cbor.CborXContent;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.hamcrest.Matcher;
@ -70,10 +71,10 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
String mode = randomMode();
Map<String, Object> adminResponse = runSql(null,
new StringEntity("{\"query\": \"" + adminSql + "\", \"fetch_size\": 1" + mode(mode) + "}",
ContentType.APPLICATION_JSON));
ContentType.APPLICATION_JSON), mode);
Map<String, Object> otherResponse = runSql(user,
new StringEntity("{\"query\": \"" + adminSql + "\", \"fetch_size\": 1" + mode(mode) + "}",
ContentType.APPLICATION_JSON));
ContentType.APPLICATION_JSON), mode);
String adminCursor = (String) adminResponse.remove("cursor");
String otherCursor = (String) otherResponse.remove("cursor");
@ -82,9 +83,9 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
assertResponse(adminResponse, otherResponse);
while (true) {
adminResponse = runSql(null,
new StringEntity("{\"cursor\": \"" + adminCursor + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON));
new StringEntity("{\"cursor\": \"" + adminCursor + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON), mode);
otherResponse = runSql(user,
new StringEntity("{\"cursor\": \"" + otherCursor + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON));
new StringEntity("{\"cursor\": \"" + otherCursor + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON), mode);
adminCursor = (String) adminResponse.remove("cursor");
otherCursor = (String) otherResponse.remove("cursor");
assertResponse(adminResponse, otherResponse);
@ -179,10 +180,10 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
}
private static Map<String, Object> runSql(@Nullable String asUser, String mode, String sql) throws IOException {
return runSql(asUser, new StringEntity("{\"query\": \"" + sql + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON));
return runSql(asUser, new StringEntity("{\"query\": \"" + sql + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON), mode);
}
private static Map<String, Object> runSql(@Nullable String asUser, HttpEntity entity) throws IOException {
private static Map<String, Object> runSql(@Nullable String asUser, HttpEntity entity, String mode) throws IOException {
Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
if (asUser != null) {
RequestOptions.Builder options = request.getOptions().toBuilder();
@ -190,7 +191,7 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
request.setOptions(options);
}
request.setEntity(entity);
return toMap(client().performRequest(request));
return toMap(client().performRequest(request), mode);
}
private static void assertResponse(Map<String, Object> expected, Map<String, Object> actual) {
@ -201,9 +202,13 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
}
}
private static Map<String, Object> toMap(Response response) throws IOException {
private static Map<String, Object> toMap(Response response, String mode) throws IOException {
try (InputStream content = response.getEntity().getContent()) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
if (mode.equalsIgnoreCase("jdbc")) {
return XContentHelper.convertToMap(CborXContent.cborXContent, content, false);
} else {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
}
}
}
@ -226,15 +231,17 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
public void testHijackScrollFails() throws Exception {
createUser("full_access", "rest_minimal");
String mode = randomMode();
Map<String, Object> adminResponse = RestActions.runSql(null,
new StringEntity("{\"query\": \"SELECT * FROM test\", \"fetch_size\": 1" + mode(randomMode()) + "}",
ContentType.APPLICATION_JSON));
new StringEntity("{\"query\": \"SELECT * FROM test\", \"fetch_size\": 1" + mode(mode) + "}",
ContentType.APPLICATION_JSON), mode);
String cursor = (String) adminResponse.remove("cursor");
assertNotNull(cursor);
final String m = randomMode();
ResponseException e = expectThrows(ResponseException.class, () -> RestActions.runSql("full_access",
new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(randomMode()) + "}", ContentType.APPLICATION_JSON)));
new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(m) + "}", ContentType.APPLICATION_JSON), m));
// TODO return a better error message for bad scrolls
assertThat(e.getMessage(), containsString("No search context found for id"));
assertEquals(404, e.getResponse().getStatusLine().getStatusCode());

View File

@ -6,16 +6,13 @@
package org.elasticsearch.xpack.sql.qa.security;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.elasticsearch.test.rest.ESRestTestCase;
@ -25,7 +22,6 @@ import org.junit.Rule;
import org.junit.rules.TestName;
import java.io.IOException;
import java.io.InputStream;
import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.Arrays;
@ -36,6 +32,7 @@ import java.util.Map;
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.mode;
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.randomMode;
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.toMap;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.SQL_QUERY_REST_ENDPOINT;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.columnInfo;
@ -174,18 +171,14 @@ public class UserFunctionIT extends ESRestTestCase {
}
private Map<String, Object> runSql(String asUser, String mode, String sql) throws IOException {
return runSql(asUser, new StringEntity("{\"query\": \"" + sql + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON));
}
private Map<String, Object> runSql(String asUser, HttpEntity entity) throws IOException {
Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
if (asUser != null) {
RequestOptions.Builder options = request.getOptions().toBuilder();
options.addHeader("es-security-runas-user", asUser);
request.setOptions(options);
}
request.setEntity(entity);
return toMap(client().performRequest(request));
request.setEntity(new StringEntity("{\"query\": \"" + sql + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON));
return toMap(client().performRequest(request), mode);
}
private void assertResponse(Map<String, Object> expected, Map<String, Object> actual) {
@ -196,12 +189,6 @@ public class UserFunctionIT extends ESRestTestCase {
}
}
private static Map<String, Object> toMap(Response response) throws IOException {
try (InputStream content = response.getEntity().getContent()) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
}
private void index(String... docs) throws IOException {
Request request = new Request("POST", "/test/_bulk");
request.addParameter("refresh", "true");

View File

@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.sql.qa.single_node;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase;
import java.io.IOException;
@ -22,9 +20,7 @@ public class RestSqlIT extends RestSqlTestCase {
public void testErrorMessageForTranslatingQueryWithWhereEvaluatingToFalse() throws IOException {
index("{\"foo\":1}");
expectBadRequest(() -> runSql(
new StringEntity("{\"query\":\"SELECT * FROM test WHERE foo = 1 AND foo = 2\"}",
ContentType.APPLICATION_JSON), "/translate/"),
expectBadRequest(() -> runTranslateSql("{\"query\":\"SELECT * FROM test WHERE foo = 1 AND foo = 2\"}"),
containsString("Cannot generate a query DSL for an SQL query that either its WHERE clause evaluates " +
"to FALSE or doesn't operate on a table (missing a FROM clause), sql statement: " +
"[SELECT * FROM test WHERE foo = 1 AND foo = 2]"));
@ -32,18 +28,14 @@ public class RestSqlIT extends RestSqlTestCase {
public void testErrorMessageForTranslatingQueryWithLocalExecution() throws IOException {
index("{\"foo\":1}");
expectBadRequest(() -> runSql(
new StringEntity("{\"query\":\"SELECT SIN(PI())\"}",
ContentType.APPLICATION_JSON), "/translate/"),
expectBadRequest(() -> runTranslateSql("{\"query\":\"SELECT SIN(PI())\"}"),
containsString("Cannot generate a query DSL for an SQL query that either its WHERE clause evaluates " +
"to FALSE or doesn't operate on a table (missing a FROM clause), sql statement: [SELECT SIN(PI())]"));
}
public void testErrorMessageForTranslatingSQLCommandStatement() throws IOException {
index("{\"foo\":1}");
expectBadRequest(() -> runSql(
new StringEntity("{\"query\":\"SHOW FUNCTIONS\"}",
ContentType.APPLICATION_JSON), "/translate/"),
expectBadRequest(() -> runTranslateSql("{\"query\":\"SHOW FUNCTIONS\"}"),
containsString("Cannot generate a query DSL for a special SQL command " +
"(e.g.: DESCRIBE, SHOW), sql statement: [SHOW FUNCTIONS]"));
}

View File

@ -117,6 +117,60 @@ public abstract class SqlProtocolTestCase extends ESRestTestCase {
"PT2H43M59.163S", "+0 02:43:59.163", 23);
}
/**
* Method that tests that a binary response (CBOR) will return either Float or Double, depending on the SQL data type, for floating
* point numbers, while JSON will always return Double for floating point numbers.
*/
public void testFloatingPointNumbersReturnTypes() throws IOException {
Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
for (Mode mode : Mode.values()) {
assertFloatingPointNumbersReturnTypes(request, mode);
}
}
@SuppressWarnings({ "unchecked" })
private void assertFloatingPointNumbersReturnTypes(Request request, Mode mode) throws IOException {
String requestContent = "{\"query\":\"SELECT "
+ "CAST(1234.34 AS REAL) AS float_positive,"
+ "CAST(-1234.34 AS REAL) AS float_negative,"
+ "1234567890123.34 AS double_positive,"
+ "-1234567890123.34 AS double_negative\""
+ mode(mode.toString()) + "}";
request.setEntity(new StringEntity(requestContent, ContentType.APPLICATION_JSON));
Map<String, Object> map;
boolean isBinaryResponse = mode != Mode.PLAIN;
Response response = client().performRequest(request);
if (isBinaryResponse == true) {
map = XContentHelper.convertToMap(CborXContent.cborXContent, response.getEntity().getContent(), false);
} else {
map = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), false);
}
List<Object> columns = (ArrayList<Object>) map.get("columns");
assertEquals(4, columns.size());
List<Object> rows = (ArrayList<Object>) map.get("rows");
assertEquals(1, rows.size());
List<Object> row = (ArrayList<Object>) rows.get(0);
assertEquals(4, row.size());
if (isBinaryResponse == true) {
assertTrue(row.get(0) instanceof Float);
assertEquals(row.get(0), 1234.34f);
assertTrue(row.get(1) instanceof Float);
assertEquals(row.get(1), -1234.34f);
} else {
assertTrue(row.get(0) instanceof Double);
assertEquals(row.get(0), 1234.34d);
assertTrue(row.get(1) instanceof Double);
assertEquals(row.get(1), -1234.34d);
}
assertTrue(row.get(2) instanceof Double);
assertEquals(row.get(2), 1234567890123.34d);
assertTrue(row.get(3) instanceof Double);
assertEquals(row.get(3), -1234567890123.34d);
}
private void assertQuery(String sql, String columnName, String columnType, Object columnValue, int displaySize)
throws IOException {
assertQuery(sql, columnName, columnType, columnValue, null, displaySize);
@ -196,6 +250,18 @@ public abstract class SqlProtocolTestCase extends ESRestTestCase {
.insert(requestContent.length() - 1, ",\"columnar\":" + columnar).toString();
}
// randomize binary response enforcement for drivers (ODBC/JDBC) and CLI
boolean binaryCommunication = randomBoolean();
Mode m = Mode.fromString(mode);
if (randomBoolean()) {
// set it explicitly or leave the default (null) as is
requestContent = new StringBuilder(requestContent)
.insert(requestContent.length() - 1, ",\"binary_format\":" + binaryCommunication).toString();
binaryCommunication = ((Mode.isDriver(m) || m == Mode.CLI) && binaryCommunication == true);
} else {
binaryCommunication = Mode.isDriver(m) || m == Mode.CLI;
}
// send the query either as body or as request parameter
if (randomBoolean()) {
request.setEntity(new StringEntity(requestContent, ContentType.APPLICATION_JSON));
@ -210,6 +276,9 @@ public abstract class SqlProtocolTestCase extends ESRestTestCase {
Response response = client().performRequest(request);
try (InputStream content = response.getEntity().getContent()) {
if (binaryCommunication == true) {
return XContentHelper.convertToMap(CborXContent.cborXContent, content, false);
}
switch(format) {
case "cbor": {
return XContentHelper.convertToMap(CborXContent.cborXContent, content, false);

View File

@ -101,6 +101,12 @@ public class EmbeddedCli implements Closeable {
args.add("false");
}
args.add("-debug");
if (randomBoolean()) {
args.add("-binary");
args.add(Boolean.toString(randomBoolean()));
}
exec = new Thread(() -> {
try {
/*

View File

@ -7,11 +7,18 @@
package org.elasticsearch.xpack.sql.qa.rest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.cbor.CborXContent;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
public abstract class BaseRestSqlTestCase extends ESRestTestCase {
@ -34,4 +41,30 @@ public abstract class BaseRestSqlTestCase extends ESRestTestCase {
public static String randomMode() {
return randomFrom(StringUtils.EMPTY, "jdbc", "plain");
}
/**
* JSON parser returns floating point numbers as Doubles, while CBOR as their actual type.
* To have the tests compare the correct data type, the floating point numbers types should be passed accordingly, to the comparators.
*/
public static Number xContentDependentFloatingNumberValue(String mode, Number value) {
Mode m = Mode.fromString(mode);
// for drivers and the CLI return the number as is, while for REST cast it implicitly to Double (the JSON standard).
if (Mode.isDriver(m) || m == Mode.CLI) {
return value;
} else {
return value.doubleValue();
}
}
public static Map<String, Object> toMap(Response response, String mode) throws IOException {
Mode m = Mode.fromString(mode);
try (InputStream content = response.getEntity().getContent()) {
// by default, drivers and the CLI respond in binary format
if (Mode.isDriver(m) || m == Mode.CLI) {
return XContentHelper.convertToMap(CborXContent.cborXContent, content, false);
} else {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
}
}
}

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.StringUtils;
import org.elasticsearch.xpack.sql.qa.ErrorsTestCase;
import org.hamcrest.Matcher;
@ -103,15 +104,16 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
+ "\"mode\":\"" + mode + "\", "
+ "\"fetch_size\":2" + columnarParameter(columnar) + "}";
Number value = xContentDependentFloatingNumberValue(mode, 1f);
String cursor = null;
for (int i = 0; i < 20; i += 2) {
Map<String, Object> response;
if (i == 0) {
response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "");
response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "", mode);
} else {
columnar = randomBoolean();
response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(mode) + columnarParameter(columnar) + "}",
ContentType.APPLICATION_JSON), StringUtils.EMPTY);
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode);
}
Map<String, Object> expected = new HashMap<>();
@ -128,11 +130,11 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
Arrays.asList("text" + i, "text" + (i + 1)),
Arrays.asList(i, i + 1),
Arrays.asList(Math.sqrt(i), Math.sqrt(i + 1)),
Arrays.asList(1.0, 1.0)));
Arrays.asList(value, value)));
} else {
expected.put("rows", Arrays.asList(
Arrays.asList("text" + i, i, Math.sqrt(i), 1.0),
Arrays.asList("text" + (i + 1), i + 1, Math.sqrt(i + 1), 1.0)));
Arrays.asList("text" + i, i, Math.sqrt(i), value),
Arrays.asList("text" + (i + 1), i + 1, Math.sqrt(i + 1), value)));
}
cursor = (String) response.remove("cursor");
assertResponse(expected, response);
@ -146,7 +148,7 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
expected.put("rows", emptyList());
}
assertResponse(expected, runSql(new StringEntity("{ \"cursor\":\"" + cursor + "\"" + mode(mode) + columnarParameter(columnar) + "}",
ContentType.APPLICATION_JSON), StringUtils.EMPTY));
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
}
@AwaitsFix(bugUrl = "Unclear status, https://github.com/elastic/x-pack-elasticsearch/issues/2074")
@ -186,10 +188,11 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
columnInfo(mode, "name", "text", JDBCType.VARCHAR, Integer.MAX_VALUE),
columnInfo(mode, "score", "long", JDBCType.BIGINT, 20),
columnInfo(mode, "SCORE()", "float", JDBCType.REAL, 15)));
Number value = xContentDependentFloatingNumberValue(mode, 1f);
if (columnar) {
expected.put("values", Arrays.asList(singletonList("test"), singletonList(10), singletonList(1.0)));
expected.put("values", Arrays.asList(singletonList("test"), singletonList(10), singletonList(value)));
} else {
expected.put("rows", singletonList(Arrays.asList("test", 10, 1.0)));
expected.put("rows", singletonList(Arrays.asList("test", 10, value)));
}
assertResponse(expected, runSql(mode, "SELECT *, SCORE() FROM test ORDER BY SCORE()", columnar));
@ -329,7 +332,8 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
request.addParameter("error_trace", "true");
request.addParameter("pretty", "true");
request.addParameter("format", format);
request.setEntity(new StringEntity("{\"columnar\":true,\"query\":\"SELECT * FROM test\"" + mode(randomMode()) + "}",
request.setEntity(new StringEntity("{\"columnar\":true,\"query\":\"SELECT * FROM test\""
+ mode(randomValueOtherThan("jdbc", () -> randomMode())) + "}",
ContentType.APPLICATION_JSON));
expectBadRequest(() -> {
client().performRequest(request);
@ -381,7 +385,11 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
private Map<String, Object> runSql(String mode, String sql, String suffix, boolean columnar) throws IOException {
// put an explicit "columnar": false parameter or omit it altogether, it should make no difference
return runSql(new StringEntity("{\"query\":\"" + sql + "\"" + mode(mode) + columnarParameter(columnar) + "}",
ContentType.APPLICATION_JSON), suffix);
ContentType.APPLICATION_JSON), suffix, mode);
}
protected Map<String, Object> runTranslateSql(String sql) throws IOException {
return runSql(new StringEntity(sql, ContentType.APPLICATION_JSON), "/translate/", Mode.PLAIN.toString());
}
private String columnarParameter(boolean columnar) {
@ -392,7 +400,7 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
}
}
protected Map<String, Object> runSql(HttpEntity sql, String suffix) throws IOException {
protected Map<String, Object> runSql(HttpEntity sql, String suffix, String mode) throws IOException {
Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT + suffix);
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
request.addParameter("pretty", "true"); // Improves error reporting readability
@ -407,11 +415,7 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
request.setOptions(options);
}
request.setEntity(sql);
Response response = client().performRequest(request);
try (InputStream content = response.getEntity().getContent()) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
return toMap(client().performRequest(request), mode);
}
public void testPrettyPrintingEnabled() throws IOException {
@ -502,8 +506,7 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
public void testBasicTranslateQuery() throws IOException {
index("{\"test\":\"test\"}", "{\"test\":\"test\"}");
Map<String, Object> response = runSql(new StringEntity("{\"query\":\"SELECT * FROM test\"" + mode(randomMode()) + "}",
ContentType.APPLICATION_JSON), "/translate/");
Map<String, Object> response = runTranslateSql("{\"query\":\"SELECT * FROM test\"}");
assertEquals(1000, response.get("size"));
@SuppressWarnings("unchecked")
Map<String, Object> source = (Map<String, Object>) response.get("_source");
@ -522,7 +525,7 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
expected.put("rows", singletonList(singletonList("foo")));
assertResponse(expected, runSql(new StringEntity("{\"query\":\"SELECT * FROM test\", " +
"\"filter\":{\"match\": {\"test\": \"foo\"}}" + mode(mode) + "}",
ContentType.APPLICATION_JSON), StringUtils.EMPTY));
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
}
public void testBasicQueryWithParameters() throws IOException {
@ -543,17 +546,14 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
}
assertResponse(expected, runSql(new StringEntity("{\"query\":\"SELECT test, ? param FROM test WHERE test = ?\", " +
"\"params\":[{\"type\": \"integer\", \"value\": 10}, {\"type\": \"keyword\", \"value\": \"foo\"}]"
+ mode(mode) + columnarParameter(columnar) + "}", ContentType.APPLICATION_JSON), StringUtils.EMPTY));
+ mode(mode) + columnarParameter(columnar) + "}", ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
}
public void testBasicTranslateQueryWithFilter() throws IOException {
index("{\"test\":\"foo\"}",
"{\"test\":\"bar\"}");
Map<String, Object> response = runSql(
new StringEntity("{\"query\":\"SELECT * FROM test\", \"filter\":{\"match\": {\"test\": \"foo\"}}}",
ContentType.APPLICATION_JSON), "/translate/"
);
Map<String, Object> response = runTranslateSql("{\"query\":\"SELECT * FROM test\", \"filter\":{\"match\": {\"test\": \"foo\"}}}");
assertEquals(response.get("size"), 1000);
@SuppressWarnings("unchecked")
@ -592,10 +592,8 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
index("{\"salary\":100}",
"{\"age\":20}");
Map<String, Object> response = runSql(
new StringEntity("{\"query\":\"SELECT avg(salary) FROM test GROUP BY abs(age) HAVING avg(salary) > 50 LIMIT 10\"}",
ContentType.APPLICATION_JSON), "/translate/"
);
Map<String, Object> response = runTranslateSql("{\"query\":\"SELECT avg(salary) FROM test GROUP BY abs(age) "
+ "HAVING avg(salary) > 50 LIMIT 10\"}");
assertEquals(response.get("size"), 0);
assertEquals(false, response.get("_source"));
@ -811,10 +809,10 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
Map<String, Object> expected = new HashMap<>();
expected.put("rows", emptyList());
assertResponse(expected, runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"}", ContentType.APPLICATION_JSON),
StringUtils.EMPTY));
StringUtils.EMPTY, Mode.PLAIN.toString()));
Map<String, Object> response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"}", ContentType.APPLICATION_JSON),
"/close");
"/close", Mode.PLAIN.toString());
assertEquals(true, response.get("succeeded"));
assertEquals(0, getNumberOfSearchContexts("test"));

View File

@ -34,21 +34,25 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
static final ParseField COLUMNAR = new ParseField("columnar");
static final ParseField FIELD_MULTI_VALUE_LENIENCY = new ParseField("field_multi_value_leniency");
static final ParseField INDEX_INCLUDE_FROZEN = new ParseField("index_include_frozen");
static final ParseField BINARY_COMMUNICATION = new ParseField("binary_format");
static {
PARSER.declareString(SqlQueryRequest::cursor, CURSOR);
PARSER.declareBoolean(SqlQueryRequest::columnar, COLUMNAR);
PARSER.declareBoolean(SqlQueryRequest::fieldMultiValueLeniency, FIELD_MULTI_VALUE_LENIENCY);
PARSER.declareBoolean(SqlQueryRequest::indexIncludeFrozen, INDEX_INCLUDE_FROZEN);
PARSER.declareBoolean(SqlQueryRequest::binaryCommunication, BINARY_COMMUNICATION);
}
private String cursor = "";
/*
* Using the Boolean object here so that SqlTranslateRequest to set this to null (since it doesn't need a "columnar" parameter).
* Using the Boolean object here so that SqlTranslateRequest to set this to null (since it doesn't need a "columnar" or
* binary parameter).
* See {@code SqlTranslateRequest.toXContent}
*/
private Boolean columnar = Boolean.FALSE;
private Boolean columnar = Protocol.COLUMNAR;
private Boolean binaryCommunication = Protocol.BINARY_COMMUNICATION;
private boolean fieldMultiValueLeniency = Protocol.FIELD_MULTI_VALUE_LENIENCY;
private boolean indexIncludeFrozen = Protocol.INDEX_INCLUDE_FROZEN;
@ -58,7 +62,7 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
public SqlQueryRequest(String query, List<SqlTypedParamValue> params, QueryBuilder filter, ZoneId zoneId,
int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout, Boolean columnar,
String cursor, RequestInfo requestInfo, boolean fieldMultiValueLeniency, boolean indexIncludeFrozen) {
String cursor, RequestInfo requestInfo, boolean fieldMultiValueLeniency, boolean indexIncludeFrozen) {
super(query, params, filter, zoneId, fetchSize, requestTimeout, pageTimeout, requestInfo);
this.cursor = cursor;
this.columnar = columnar;
@ -109,7 +113,6 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
return this;
}
public SqlQueryRequest fieldMultiValueLeniency(boolean leniency) {
this.fieldMultiValueLeniency = leniency;
return this;
@ -128,12 +131,22 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
return indexIncludeFrozen;
}
public SqlQueryRequest binaryCommunication(boolean binaryCommunication) {
this.binaryCommunication = binaryCommunication;
return this;
}
public Boolean binaryCommunication() {
return binaryCommunication;
}
public SqlQueryRequest(StreamInput in) throws IOException {
super(in);
cursor = in.readString();
columnar = in.readOptionalBoolean();
fieldMultiValueLeniency = in.readBoolean();
indexIncludeFrozen = in.readBoolean();
binaryCommunication = in.readOptionalBoolean();
}
@Override
@ -143,11 +156,12 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
out.writeOptionalBoolean(columnar);
out.writeBoolean(fieldMultiValueLeniency);
out.writeBoolean(indexIncludeFrozen);
out.writeOptionalBoolean(binaryCommunication);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), cursor, columnar, fieldMultiValueLeniency, indexIncludeFrozen);
return Objects.hash(super.hashCode(), cursor, columnar, fieldMultiValueLeniency, indexIncludeFrozen, binaryCommunication);
}
@Override
@ -156,7 +170,8 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
&& Objects.equals(cursor, ((SqlQueryRequest) obj).cursor)
&& Objects.equals(columnar, ((SqlQueryRequest) obj).columnar)
&& fieldMultiValueLeniency == ((SqlQueryRequest) obj).fieldMultiValueLeniency
&& indexIncludeFrozen == ((SqlQueryRequest) obj).indexIncludeFrozen;
&& indexIncludeFrozen == ((SqlQueryRequest) obj).indexIncludeFrozen
&& binaryCommunication == ((SqlQueryRequest) obj).binaryCommunication;
}
@Override
@ -168,8 +183,8 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// This is needed just to test round-trip compatibility with proto.SqlQueryRequest
return new org.elasticsearch.xpack.sql.proto.SqlQueryRequest(query(), params(), zoneId(), fetchSize(), requestTimeout(),
pageTimeout(), filter(), columnar(), cursor(), requestInfo(), fieldMultiValueLeniency(), indexIncludeFrozen())
.toXContent(builder, params);
pageTimeout(), filter(), columnar(), cursor(), requestInfo(), fieldMultiValueLeniency(), indexIncludeFrozen(),
binaryCommunication()).toXContent(builder, params);
}
public static SqlQueryRequest fromXContent(XContentParser parser) {

View File

@ -70,6 +70,7 @@ public class SqlTranslateRequest extends AbstractSqlQueryRequest {
null,
requestInfo(),
false,
false).toXContent(builder, params);
false,
null).toXContent(builder, params);
}
}

View File

@ -27,7 +27,7 @@ dependencies {
compile xpackProject('plugin:sql:sql-client')
compile xpackProject('plugin:sql:sql-action')
compile project(":libs:elasticsearch-cli")
compile project(':libs:elasticsearch-x-content')
runtime "org.elasticsearch:jna:${versions.jna}"
testCompile project(":test:framework")
}

View File

@ -38,6 +38,7 @@ public class Cli extends LoggingAwareCommand {
private final OptionSpec<String> keystoreLocation;
private final OptionSpec<Boolean> checkOption;
private final OptionSpec<String> connectionString;
private final OptionSpec<Boolean> binaryCommunication;
/**
* Use this VM Options to run in IntelliJ or Eclipse:
@ -80,11 +81,15 @@ public class Cli extends LoggingAwareCommand {
super("Elasticsearch SQL CLI");
this.cliTerminal = cliTerminal;
parser.acceptsAll(Arrays.asList("d", "debug"), "Enable debug logging");
this.binaryCommunication = parser.acceptsAll(Arrays.asList("b", "binary"), "Disable binary communication. "
+ "Enabled by default. Accepts 'true' or 'false' values.")
.withRequiredArg().ofType(Boolean.class)
.defaultsTo(Boolean.parseBoolean(System.getProperty("binary", "true")));
this.keystoreLocation = parser.acceptsAll(
Arrays.asList("k", "keystore_location"),
"Location of a keystore to use when setting up SSL. "
+ "If specified then the CLI will prompt for a keystore password. "
+ "If specified when the uri isn't https then an error is thrown.")
Arrays.asList("k", "keystore_location"),
"Location of a keystore to use when setting up SSL. "
+ "If specified then the CLI will prompt for a keystore password. "
+ "If specified when the uri isn't https then an error is thrown.")
.withRequiredArg().ofType(String.class);
this.checkOption = parser.acceptsAll(Arrays.asList("c", "check"),
"Enable initial connection check on startup")
@ -96,6 +101,7 @@ public class Cli extends LoggingAwareCommand {
@Override
protected void execute(org.elasticsearch.cli.Terminal terminal, OptionSet options) throws Exception {
boolean debug = options.has("d") || options.has("debug");
boolean binary = binaryCommunication.value(options);
boolean checkConnection = checkOption.value(options);
List<String> args = connectionString.values(options);
if (args.size() > 1) {
@ -107,10 +113,10 @@ public class Cli extends LoggingAwareCommand {
throw new UserException(ExitCodes.USAGE, "expecting a single keystore file");
}
String keystoreLocationValue = args.size() == 1 ? args.get(0) : null;
execute(uri, debug, keystoreLocationValue, checkConnection);
execute(uri, debug, binary, keystoreLocationValue, checkConnection);
}
private void execute(String uri, boolean debug, String keystoreLocation, boolean checkConnection) throws Exception {
private void execute(String uri, boolean debug, boolean binary, String keystoreLocation, boolean checkConnection) throws Exception {
CliCommand cliCommand = new CliCommands(
new PrintLogoCommand(),
new ClearScreenCliCommand(),
@ -121,7 +127,7 @@ public class Cli extends LoggingAwareCommand {
);
try {
ConnectionBuilder connectionBuilder = new ConnectionBuilder(cliTerminal);
ConnectionConfiguration con = connectionBuilder.buildConnection(uri, keystoreLocation);
ConnectionConfiguration con = connectionBuilder.buildConnection(uri, keystoreLocation, binary);
CliSession cliSession = new CliSession(new HttpClient(con));
cliSession.setDebug(debug);
if (checkConnection) {

View File

@ -37,9 +37,11 @@ public class ConnectionBuilder {
*
* @param connectionStringArg the connection string to connect to
* @param keystoreLocation the location of the keystore to configure. If null then use the system keystore.
* @param binaryCommunication should the communication between the CLI and server be binary (CBOR)
* @throws UserException if there is a problem with the information provided by the user
*/
public ConnectionConfiguration buildConnection(String connectionStringArg, String keystoreLocation) throws UserException {
public ConnectionConfiguration buildConnection(String connectionStringArg, String keystoreLocation,
boolean binaryCommunication) throws UserException {
final URI uri;
final String connectionString;
Properties properties = new Properties();
@ -92,6 +94,8 @@ public class ConnectionBuilder {
properties.setProperty(ConnectionConfiguration.AUTH_PASS, password);
}
properties.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(binaryCommunication));
return newConnectionConfiguration(uri, connectionString, properties);
}

View File

@ -21,6 +21,7 @@ public class CliSession {
private int fetchSize = Protocol.FETCH_SIZE;
private String fetchSeparator = "";
private boolean debug;
private boolean binary;
public CliSession(HttpClient httpClient) {
this.httpClient = httpClient;
@ -57,6 +58,14 @@ public class CliSession {
return debug;
}
public void setBinary(boolean binary) {
this.binary = binary;
}
public boolean isBinary() {
return binary;
}
public void checkConnection() throws ClientException {
MainResponse response;
try {

View File

@ -26,7 +26,8 @@ public class ConnectionBuilderTests extends ESTestCase {
public void testDefaultConnection() throws Exception {
CliTerminal testTerminal = mock(CliTerminal.class);
ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal);
ConnectionConfiguration con = connectionBuilder.buildConnection(null, null);
boolean binaryCommunication = randomBoolean();
ConnectionConfiguration con = connectionBuilder.buildConnection(null, null, binaryCommunication);
assertNull(con.authUser());
assertNull(con.authPass());
assertEquals("http://localhost:9200/", con.connectionString());
@ -36,13 +37,14 @@ public class ConnectionBuilderTests extends ESTestCase {
assertEquals(45000, con.pageTimeout());
assertEquals(90000, con.queryTimeout());
assertEquals(1000, con.pageSize());
assertEquals(binaryCommunication, con.binaryCommunication());
verifyNoMoreInteractions(testTerminal);
}
public void testBasicConnection() throws Exception {
CliTerminal testTerminal = mock(CliTerminal.class);
ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal);
ConnectionConfiguration con = connectionBuilder.buildConnection("http://foobar:9242/", null);
ConnectionConfiguration con = buildConnection(connectionBuilder, "http://foobar:9242/", null);
assertNull(con.authUser());
assertNull(con.authPass());
assertEquals("http://foobar:9242/", con.connectionString());
@ -53,7 +55,7 @@ public class ConnectionBuilderTests extends ESTestCase {
public void testUserAndPasswordConnection() throws Exception {
CliTerminal testTerminal = mock(CliTerminal.class);
ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal);
ConnectionConfiguration con = connectionBuilder.buildConnection("http://user:pass@foobar:9242/", null);
ConnectionConfiguration con = buildConnection(connectionBuilder, "http://user:pass@foobar:9242/", null);
assertEquals("user", con.authUser());
assertEquals("pass", con.authPass());
assertEquals("http://user:pass@foobar:9242/", con.connectionString());
@ -65,7 +67,7 @@ public class ConnectionBuilderTests extends ESTestCase {
CliTerminal testTerminal = mock(CliTerminal.class);
when(testTerminal.readPassword("password: ")).thenReturn("password");
ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal);
ConnectionConfiguration con = connectionBuilder.buildConnection("http://user@foobar:9242/", null);
ConnectionConfiguration con = buildConnection(connectionBuilder, "http://user@foobar:9242/", null);
assertEquals("user", con.authUser());
assertEquals("password", con.authPass());
assertEquals("http://user@foobar:9242/", con.connectionString());
@ -99,7 +101,7 @@ public class ConnectionBuilderTests extends ESTestCase {
return null;
}
};
assertNull(connectionBuilder.buildConnection("https://user@foobar:9242/", "keystore_location"));
assertNull(buildConnection(connectionBuilder, "https://user@foobar:9242/", "keystore_location"));
assertTrue(called.get());
verify(testTerminal, times(2)).readPassword(any());
verifyNoMoreInteractions(testTerminal);
@ -111,7 +113,7 @@ public class ConnectionBuilderTests extends ESTestCase {
when(testTerminal.readPassword("password: ")).thenThrow(ue);
ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal);
UserException actual = expectThrows(UserException.class, () ->
connectionBuilder.buildConnection("http://user@foobar:9242/", null));
buildConnection(connectionBuilder, "http://user@foobar:9242/", null));
assertSame(actual, ue);
}
@ -127,7 +129,12 @@ public class ConnectionBuilderTests extends ESTestCase {
}
};
UserException actual = expectThrows(UserException.class, () ->
connectionBuilder.buildConnection("https://user@foobar:9242/", "keystore_location"));
buildConnection(connectionBuilder, "https://user@foobar:9242/", "keystore_location"));
assertSame(actual, ue);
}
private ConnectionConfiguration buildConnection(ConnectionBuilder builder, String connectionStringArg,
String keystoreLocation) throws UserException {
return builder.buildConnection(connectionStringArg, keystoreLocation, randomBoolean());
}
}

View File

@ -11,6 +11,7 @@ dependencies {
compile xpackProject('plugin:sql:sql-proto')
compile "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
testCompile project(":test:framework")
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
}
dependencyLicenses {

View File

@ -31,7 +31,11 @@ public class ConnectionConfiguration {
// Validation
public static final String PROPERTIES_VALIDATION = "validate.properties";
public static final String PROPERTIES_VALIDATION_DEFAULT = "true";
private static final String PROPERTIES_VALIDATION_DEFAULT = "true";
// Binary communication
public static final String BINARY_COMMUNICATION = "binary.format";
private static final String BINARY_COMMUNICATION_DEFAULT = "true";
// Timeouts
@ -63,8 +67,8 @@ public class ConnectionConfiguration {
public static final String AUTH_PASS = "password";
protected static final Set<String> OPTION_NAMES = new LinkedHashSet<>(
Arrays.asList(PROPERTIES_VALIDATION, CONNECT_TIMEOUT, NETWORK_TIMEOUT, QUERY_TIMEOUT, PAGE_TIMEOUT, PAGE_SIZE,
AUTH_USER, AUTH_PASS));
Arrays.asList(PROPERTIES_VALIDATION, BINARY_COMMUNICATION, CONNECT_TIMEOUT, NETWORK_TIMEOUT, QUERY_TIMEOUT, PAGE_TIMEOUT,
PAGE_SIZE, AUTH_USER, AUTH_PASS));
static {
OPTION_NAMES.addAll(SslConfig.OPTION_NAMES);
@ -72,6 +76,7 @@ public class ConnectionConfiguration {
}
private final boolean validateProperties;
private final boolean binaryCommunication;
// Base URI for all request
private final URI baseURI;
@ -100,6 +105,9 @@ public class ConnectionConfiguration {
checkPropertyNames(settings, optionNames());
}
binaryCommunication = parseValue(BINARY_COMMUNICATION, settings.getProperty(BINARY_COMMUNICATION, BINARY_COMMUNICATION_DEFAULT),
Boolean::parseBoolean);
connectTimeout = parseValue(CONNECT_TIMEOUT, settings.getProperty(CONNECT_TIMEOUT, CONNECT_TIMEOUT_DEFAULT), Long::parseLong);
networkTimeout = parseValue(NETWORK_TIMEOUT, settings.getProperty(NETWORK_TIMEOUT, NETWORK_TIMEOUT_DEFAULT), Long::parseLong);
queryTimeout = parseValue(QUERY_TIMEOUT, settings.getProperty(QUERY_TIMEOUT, QUERY_TIMEOUT_DEFAULT), Long::parseLong);
@ -117,10 +125,11 @@ public class ConnectionConfiguration {
this.baseURI = normalizeSchema(baseURI, connectionString, sslConfig.isEnabled());
}
public ConnectionConfiguration(URI baseURI, String connectionString, boolean validateProperties, long connectTimeout,
long networkTimeout, long queryTimeout, long pageTimeout, int pageSize, String user, String pass,
SslConfig sslConfig, ProxyConfig proxyConfig) throws ClientException {
public ConnectionConfiguration(URI baseURI, String connectionString, boolean validateProperties, boolean binaryCommunication,
long connectTimeout, long networkTimeout, long queryTimeout, long pageTimeout, int pageSize,
String user, String pass, SslConfig sslConfig, ProxyConfig proxyConfig) throws ClientException {
this.validateProperties = validateProperties;
this.binaryCommunication = binaryCommunication;
this.connectionString = connectionString;
this.connectTimeout = connectTimeout;
this.networkTimeout = networkTimeout;
@ -192,6 +201,10 @@ public class ConnectionConfiguration {
return validateProperties;
}
public boolean binaryCommunication() {
return binaryCommunication;
}
public SslConfig sslConfig() {
return sslConfig;
}

View File

@ -42,12 +42,12 @@ import java.util.function.Function;
*/
public class HttpClient {
private static final XContentType REQUEST_BODY_CONTENT_TYPE = XContentType.JSON;
private final ConnectionConfiguration cfg;
private final XContentType requestBodyContentType;
public HttpClient(ConnectionConfiguration cfg) {
this.cfg = cfg;
this.requestBodyContentType = cfg.binaryCommunication() ? XContentType.CBOR : XContentType.JSON;
}
private NamedXContentRegistry registry = NamedXContentRegistry.EMPTY;
@ -72,7 +72,8 @@ public class HttpClient {
null,
new RequestInfo(Mode.CLI),
false,
false);
false,
cfg.binaryCommunication());
return query(sqlRequest);
}
@ -83,7 +84,7 @@ public class HttpClient {
public SqlQueryResponse nextPage(String cursor) throws SQLException {
// method called only from CLI
SqlQueryRequest sqlRequest = new SqlQueryRequest(cursor, TimeValue.timeValueMillis(cfg.queryTimeout()),
TimeValue.timeValueMillis(cfg.pageTimeout()), new RequestInfo(Mode.CLI));
TimeValue.timeValueMillis(cfg.pageTimeout()), new RequestInfo(Mode.CLI), cfg.binaryCommunication());
return post(Protocol.SQL_QUERY_REST_ENDPOINT, sqlRequest, SqlQueryResponse::fromXContent);
}
@ -105,7 +106,8 @@ public class HttpClient {
con.request(
(out) -> out.write(requestBytes),
this::readFrom,
"POST"
"POST",
requestBodyContentType.mediaTypeWithoutParameters() // "application/cbor" or "application/json"
)
)).getResponseOrThrowException();
return fromXContent(response.v1(), response.v2(), responseParser);
@ -113,7 +115,7 @@ public class HttpClient {
private boolean head(String path, long timeoutInMs) throws SQLException {
ConnectionConfiguration pingCfg = new ConnectionConfiguration(cfg.baseUri(), cfg.connectionString(), cfg.validateProperties(),
cfg.connectTimeout(), timeoutInMs, cfg.queryTimeout(), cfg.pageTimeout(), cfg.pageSize(),
cfg.binaryCommunication(), cfg.connectTimeout(), timeoutInMs, cfg.queryTimeout(), cfg.pageTimeout(), cfg.pageSize(),
cfg.authUser(), cfg.authPass(), cfg.sslConfig(), cfg.proxyConfig());
try {
return AccessController.doPrivileged((PrivilegedAction<Boolean>) () ->
@ -137,9 +139,9 @@ public class HttpClient {
return fromXContent(response.v1(), response.v2(), responseParser);
}
private static <Request extends ToXContent> byte[] toXContent(Request xContent) {
private <Request extends ToXContent> byte[] toXContent(Request xContent) {
try(ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
try (XContentBuilder xContentBuilder = new XContentBuilder(REQUEST_BODY_CONTENT_TYPE.xContent(), buffer)) {
try (XContentBuilder xContentBuilder = new XContentBuilder(requestBodyContentType.xContent(), buffer)) {
if (xContent.isFragment()) {
xContentBuilder.startObject();
}

View File

@ -140,11 +140,19 @@ public class JreHttpUrlConnection implements Closeable {
CheckedConsumer<OutputStream, IOException> doc,
CheckedBiFunction<InputStream, Function<String, String>, R, IOException> parser,
String requestMethod
) throws ClientException {
return request(doc, parser, requestMethod, "application/json");
}
public <R> ResponseOrException<R> request(
CheckedConsumer<OutputStream, IOException> doc,
CheckedBiFunction<InputStream, Function<String, String>, R, IOException> parser,
String requestMethod, String contentTypeHeader
) throws ClientException {
try {
con.setRequestMethod(requestMethod);
con.setDoOutput(true);
con.setRequestProperty("Content-Type", "application/json");
con.setRequestProperty("Content-Type", contentTypeHeader);
con.setRequestProperty("Accept", "application/json");
if (doc != null) {
try (OutputStream out = con.getOutputStream()) {

View File

@ -0,0 +1,333 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.client;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
import org.elasticsearch.xpack.sql.proto.SqlQueryRequest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
public class HttpClientRequestTests extends ESTestCase {
private static RawRequestMockWebServer webServer = new RawRequestMockWebServer();
private static final Logger logger = LogManager.getLogger(HttpClientRequestTests.class);
@BeforeClass
public static void init() throws Exception {
webServer.start();
}
@AfterClass
public static void cleanup() {
webServer.close();
}
public void testBinaryRequestForCLIEnabled() throws URISyntaxException {
assertBinaryRequestForCLI(true, XContentType.CBOR);
}
public void testBinaryRequestForCLIDisabled() throws URISyntaxException {
assertBinaryRequestForCLI(false, XContentType.JSON);
}
public void testBinaryRequestForDriversEnabled() throws URISyntaxException {
assertBinaryRequestForDrivers(true, XContentType.CBOR);
}
public void testBinaryRequestForDriversDisabled() throws URISyntaxException {
assertBinaryRequestForDrivers(false, XContentType.JSON);
}
private void assertBinaryRequestForCLI(boolean isBinary, XContentType xContentType) throws URISyntaxException {
String url = "http://" + webServer.getHostName() + ":" + webServer.getPort();
String query = randomAlphaOfLength(256);
int fetchSize = randomIntBetween(1, 100);
Properties props = new Properties();
props.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(isBinary));
URI uri = new URI(url);
ConnectionConfiguration conCfg = new ConnectionConfiguration(uri, url, props);
HttpClient httpClient = new HttpClient(conCfg);
prepareMockResponse();
try {
httpClient.basicQuery(query, fetchSize);
} catch (SQLException e) {
logger.info("Ignored SQLException", e);
}
assertEquals(1, webServer.requests().size());
RawRequest recordedRequest = webServer.takeRequest();
assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type"));
assertEquals("POST", recordedRequest.getMethod());
BytesReference bytesRef = recordedRequest.getBodyAsBytes();
Map<String, Object> reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2();
assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase(Mode.CLI.toString()));
assertEquals(isBinary, reqContent.get("binary_format"));
assertEquals(Boolean.FALSE, reqContent.get("columnar"));
assertEquals(fetchSize, reqContent.get("fetch_size"));
assertEquals(query, reqContent.get("query"));
assertEquals("90000ms", reqContent.get("request_timeout"));
assertEquals("45000ms", reqContent.get("page_timeout"));
assertEquals("Z", reqContent.get("time_zone"));
prepareMockResponse();
try {
// we don't care what the cursor is, because the ES node that will actually handle the request (as in running an ES search)
// will not see/have access to the "binary_format" response, which is the concern of the first node getting the request
httpClient.nextPage("");
} catch (SQLException e) {
logger.info("Ignored SQLException", e);
}
assertEquals(1, webServer.requests().size());
recordedRequest = webServer.takeRequest();
assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type"));
assertEquals("POST", recordedRequest.getMethod());
bytesRef = recordedRequest.getBodyAsBytes();
reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2();
assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase(Mode.CLI.toString()));
assertEquals(isBinary, reqContent.get("binary_format"));
assertEquals("90000ms", reqContent.get("request_timeout"));
assertEquals("45000ms", reqContent.get("page_timeout"));
}
private void assertBinaryRequestForDrivers(boolean isBinary, XContentType xContentType) throws URISyntaxException {
String url = "http://" + webServer.getHostName() + ":" + webServer.getPort();
String query = randomAlphaOfLength(256);
Properties props = new Properties();
props.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(isBinary));
URI uri = new URI(url);
ConnectionConfiguration conCfg = new ConnectionConfiguration(uri, url, props);
HttpClient httpClient = new HttpClient(conCfg);
Mode mode = randomFrom(Mode.JDBC, Mode.ODBC);
SqlQueryRequest request = new SqlQueryRequest(query,
null,
ZoneId.of("Z"),
randomIntBetween(1, 100),
TimeValue.timeValueMillis(randomNonNegativeLong()),
TimeValue.timeValueMillis(randomNonNegativeLong()),
null,
randomBoolean(),
randomAlphaOfLength(128),
new RequestInfo(mode),
randomBoolean(),
randomBoolean(),
isBinary);
prepareMockResponse();
try {
httpClient.query(request);
} catch (SQLException e) {
logger.info("Ignored SQLException", e);
}
assertEquals(1, webServer.requests().size());
RawRequest recordedRequest = webServer.takeRequest();
assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type"));
assertEquals("POST", recordedRequest.getMethod());
BytesReference bytesRef = recordedRequest.getBodyAsBytes();
Map<String, Object> reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2();
assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase(mode.toString()));
assertEquals(isBinary, reqContent.get("binary_format"));
assertEquals(query, reqContent.get("query"));
assertEquals("Z", reqContent.get("time_zone"));
}
private void prepareMockResponse() {
webServer.enqueue(new Response().setResponseCode(200).addHeader("Content-Type", "application/json").setBody("{\"rows\":[]}"));
}
@SuppressForbidden(reason = "use http server")
private static class RawRequestMockWebServer implements Closeable {
private HttpServer server;
private final Queue<Response> responses = ConcurrentCollections.newQueue();
private final Queue<RawRequest> requests = ConcurrentCollections.newQueue();
private String hostname;
private int port;
RawRequestMockWebServer() {
}
void start() throws IOException {
InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0);
server = MockHttpServer.createHttp(address, 0);
server.start();
this.hostname = server.getAddress().getHostString();
this.port = server.getAddress().getPort();
server.createContext("/", s -> {
try {
Response response = responses.poll();
RawRequest request = createRequest(s);
requests.add(request);
s.getResponseHeaders().putAll(response.getHeaders());
if (Strings.isEmpty(response.getBody())) {
s.sendResponseHeaders(response.getStatusCode(), 0);
} else {
byte[] responseAsBytes = response.getBody().getBytes(StandardCharsets.UTF_8);
s.sendResponseHeaders(response.getStatusCode(), responseAsBytes.length);
if ("HEAD".equals(request.getMethod()) == false) {
try (OutputStream responseBody = s.getResponseBody()) {
responseBody.write(responseAsBytes);
}
}
}
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to respond to request [{} {}]",
s.getRequestMethod(), s.getRequestURI()), e);
} finally {
s.close();
}
});
}
private RawRequest createRequest(HttpExchange exchange) throws IOException {
RawRequest request = new RawRequest(exchange.getRequestMethod(), exchange.getRequestHeaders());
if (exchange.getRequestBody() != null) {
BytesReference bytesRef = Streams.readFully(exchange.getRequestBody());
request.setBodyAsBytes(bytesRef);
}
return request;
}
String getHostName() {
return hostname;
}
int getPort() {
return port;
}
void enqueue(Response response) {
responses.add(response);
}
List<RawRequest> requests() {
return new ArrayList<>(requests);
}
RawRequest takeRequest() {
return requests.poll();
}
@Override
public void close() {
if (server.getExecutor() instanceof ExecutorService) {
terminate((ExecutorService) server.getExecutor());
}
server.stop(0);
}
}
private static class RawRequest {
private final String method;
private final Headers headers;
private BytesReference bodyAsBytes = null;
RawRequest(String method, Headers headers) {
this.method = method;
this.headers = headers;
}
public String getMethod() {
return method;
}
public String getHeader(String name) {
return headers.getFirst(name);
}
public BytesReference getBodyAsBytes() {
return bodyAsBytes;
}
public void setBodyAsBytes(BytesReference bodyAsBytes) {
this.bodyAsBytes = bodyAsBytes;
}
}
private class Response {
private String body = null;
private int statusCode = 200;
private Headers headers = new Headers();
public Response setBody(String body) {
this.body = body;
return this;
}
public Response setResponseCode(int statusCode) {
this.statusCode = statusCode;
return this;
}
public Response addHeader(String name, String value) {
headers.add(name, value);
return this;
}
String getBody() {
return body;
}
int getStatusCode() {
return statusCode;
}
Headers getHeaders() {
return headers;
}
}
}

View File

@ -25,6 +25,14 @@ public final class Protocol {
public static final boolean FIELD_MULTI_VALUE_LENIENCY = false;
public static final boolean INDEX_INCLUDE_FROZEN = false;
/*
* Using the Boolean object here so that SqlTranslateRequest to set this to null (since it doesn't need a "columnar" or
* binary parameter).
* See {@code SqlTranslateRequest.toXContent}
*/
public static final Boolean COLUMNAR = Boolean.FALSE;
public static final Boolean BINARY_COMMUNICATION = null;
/**
* SQL-related endpoints
*/

View File

@ -34,10 +34,12 @@ public class SqlQueryRequest extends AbstractSqlRequest {
private final List<SqlTypedParamValue> params;
private final boolean fieldMultiValueLeniency;
private final boolean indexIncludeFrozen;
private final Boolean binaryCommunication;
public SqlQueryRequest(String query, List<SqlTypedParamValue> params, ZoneId zoneId, int fetchSize,
TimeValue requestTimeout, TimeValue pageTimeout, ToXContent filter, Boolean columnar,
String cursor, RequestInfo requestInfo, boolean fieldMultiValueLeniency, boolean indexIncludeFrozen) {
String cursor, RequestInfo requestInfo, boolean fieldMultiValueLeniency, boolean indexIncludeFrozen,
Boolean binaryCommunication) {
super(requestInfo);
this.query = query;
this.params = params;
@ -50,11 +52,13 @@ public class SqlQueryRequest extends AbstractSqlRequest {
this.cursor = cursor;
this.fieldMultiValueLeniency = fieldMultiValueLeniency;
this.indexIncludeFrozen = indexIncludeFrozen;
this.binaryCommunication = binaryCommunication;
}
public SqlQueryRequest(String cursor, TimeValue requestTimeout, TimeValue pageTimeout, RequestInfo requestInfo) {
public SqlQueryRequest(String cursor, TimeValue requestTimeout, TimeValue pageTimeout, RequestInfo requestInfo,
boolean binaryCommunication) {
this("", Collections.emptyList(), Protocol.TIME_ZONE, Protocol.FETCH_SIZE, requestTimeout, pageTimeout,
null, false, cursor, requestInfo, Protocol.FIELD_MULTI_VALUE_LENIENCY, Protocol.INDEX_INCLUDE_FROZEN);
null, false, cursor, requestInfo, Protocol.FIELD_MULTI_VALUE_LENIENCY, Protocol.INDEX_INCLUDE_FROZEN, binaryCommunication);
}
/**
@ -131,6 +135,10 @@ public class SqlQueryRequest extends AbstractSqlRequest {
return indexIncludeFrozen;
}
public Boolean binaryCommunication() {
return binaryCommunication;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -143,23 +151,24 @@ public class SqlQueryRequest extends AbstractSqlRequest {
return false;
}
SqlQueryRequest that = (SqlQueryRequest) o;
return fetchSize == that.fetchSize &&
Objects.equals(query, that.query) &&
Objects.equals(params, that.params) &&
Objects.equals(zoneId, that.zoneId) &&
Objects.equals(requestTimeout, that.requestTimeout) &&
Objects.equals(pageTimeout, that.pageTimeout) &&
Objects.equals(filter, that.filter) &&
Objects.equals(columnar, that.columnar) &&
Objects.equals(cursor, that.cursor) &&
fieldMultiValueLeniency == that.fieldMultiValueLeniency &&
indexIncludeFrozen == that.indexIncludeFrozen;
return fetchSize == that.fetchSize
&& Objects.equals(query, that.query)
&& Objects.equals(params, that.params)
&& Objects.equals(zoneId, that.zoneId)
&& Objects.equals(requestTimeout, that.requestTimeout)
&& Objects.equals(pageTimeout, that.pageTimeout)
&& Objects.equals(filter, that.filter)
&& Objects.equals(columnar, that.columnar)
&& Objects.equals(cursor, that.cursor)
&& fieldMultiValueLeniency == that.fieldMultiValueLeniency
&& indexIncludeFrozen == that.indexIncludeFrozen
&& Objects.equals(binaryCommunication, that.binaryCommunication);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), query, zoneId, fetchSize, requestTimeout, pageTimeout,
filter, columnar, cursor, fieldMultiValueLeniency, indexIncludeFrozen);
filter, columnar, cursor, fieldMultiValueLeniency, indexIncludeFrozen, binaryCommunication);
}
@Override
@ -171,7 +180,7 @@ public class SqlQueryRequest extends AbstractSqlRequest {
if (clientId() != null) {
builder.field("client_id", clientId());
}
if (this.params.isEmpty() == false) {
if (this.params != null && this.params.isEmpty() == false) {
builder.startArray("params");
for (SqlTypedParamValue val : this.params) {
val.toXContent(builder, params);
@ -203,6 +212,9 @@ public class SqlQueryRequest extends AbstractSqlRequest {
if (indexIncludeFrozen) {
builder.field("index_include_frozen", indexIncludeFrozen);
}
if (binaryCommunication != null) {
builder.field("binary_format", binaryCommunication);
}
if (cursor != null) {
builder.field("cursor", cursor);
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.rest.action.RestResponseListener;
import org.elasticsearch.xpack.sql.action.SqlQueryAction;
import org.elasticsearch.xpack.sql.action.SqlQueryRequest;
import org.elasticsearch.xpack.sql.action.SqlQueryResponse;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.Protocol;
import java.io.IOException;
@ -65,7 +66,15 @@ public class RestSqlQueryAction extends BaseRestHandler {
* isn't but there is a {@code Accept} header then we use that. If there
* isn't then we use the {@code Content-Type} header which is required.
*/
String accept = request.param("format");
String accept = null;
if ((Mode.isDriver(sqlRequest.requestInfo().mode()) || sqlRequest.requestInfo().mode() == Mode.CLI)
&& (sqlRequest.binaryCommunication() == null || sqlRequest.binaryCommunication() == true)) {
// enforce CBOR response for drivers and CLI (unless instructed differently through the config param)
accept = XContentType.CBOR.name();
} else {
accept = request.param("format");
}
if (accept == null) {
accept = request.header("Accept");
if ("*/*".equals(accept)) {