Add scrolling support to jdbc (elastic/x-pack-elasticsearch#2524)

* Switch `ResultSet#getFetchSize` from returning the *requested*
fetch size to returning the size of the current page of results.
For scrolling searches without parent/child this'll match the
requested fetch size but for other things it won't. The nice thing
about this is that it lets us tell users a thing to look at if
they are wondering why they are using a bunch of memory.
* Remove all the entire JDBC action and implement it on the REST
layer instead.
* Create some code that can be shared between the cli and jdbc
actions to properly handle errors and request deserialization so
there isn't as much copy and paste between the two. This helps
because it calls out explicitly the places where they are different.
  * I have not moved the CLI REST handle to shared code because
I think it'd be clearer to make that change in a followup PR.
* There is now no more need for constructs that allow iterating
all of the results in the same request. I have not removed these
because I feel that that cleanup is big enough to deserve its own
PR.

Original commit: elastic/x-pack-elasticsearch@3b12afd11c
This commit is contained in:
Nik Everett 2017-09-25 14:41:46 -04:00 committed by GitHub
parent c31c2af872
commit c7c79bc1c0
42 changed files with 689 additions and 787 deletions

View File

@ -65,7 +65,6 @@ import org.elasticsearch.xpack.security.user.SystemUser;
import org.elasticsearch.xpack.security.user.User;
import org.elasticsearch.xpack.security.user.XPackSecurityUser;
import org.elasticsearch.xpack.security.user.XPackUser;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlTranslateAction;
@ -490,7 +489,6 @@ public class AuthorizationService extends AbstractComponent {
*/
private static boolean isDelayedIndicesAction(String action) {
return action.equals(SqlAction.NAME) ||
action.equals(JdbcAction.NAME) ||
action.equals(SqlTranslateAction.NAME);
}

View File

@ -1,42 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
public class JdbcActionIT extends AbstractSqlIntegTestCase {
public void testJdbcAction() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test").get());
client().prepareBulk()
.add(new IndexRequest("test", "doc", "1").source("data", "bar", "count", 42))
.add(new IndexRequest("test", "doc", "2").source("data", "baz", "count", 43))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
ensureYellow("test");
Request request = new MetaTableRequest("test");
JdbcResponse jdbcResponse = client().prepareExecute(JdbcAction.INSTANCE).request(request).get();
Response response = jdbcResponse.response(request);
assertThat(response, instanceOf(MetaTableResponse.class));
MetaTableResponse metaTableResponse = (MetaTableResponse) response;
assertThat(metaTableResponse.tables, hasSize(1));
assertThat(metaTableResponse.tables.get(0), equalTo("test"));
}
}

View File

@ -11,6 +11,8 @@ import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse.ColumnInfo;
import java.sql.JDBCType;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
@ -33,8 +35,8 @@ public class SqlActionIT extends AbstractSqlIntegTestCase {
assertThat(response.columns(), hasSize(2));
int dataIndex = dataBeforeCount ? 0 : 1;
int countIndex = dataBeforeCount ? 1 : 0;
assertEquals(new ColumnInfo("data", "text"), response.columns().get(dataIndex));
assertEquals(new ColumnInfo("count", "long"), response.columns().get(countIndex));
assertEquals(new ColumnInfo("data", "text", JDBCType.VARCHAR), response.columns().get(dataIndex));
assertEquals(new ColumnInfo("count", "long", JDBCType.BIGINT), response.columns().get(countIndex));
assertThat(response.rows(), hasSize(2));
assertEquals("bar", response.rows().get(0).get(dataIndex));

View File

@ -5,28 +5,54 @@
*/
package org.elasticsearch.xpack.sql;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.AbstractLicensesIntegrationTestCase;
import org.elasticsearch.license.License;
import org.elasticsearch.license.License.OperationMode;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlTranslateAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlTranslateResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.hamcrest.Matchers;
import org.junit.Before;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.license.XPackLicenseStateTests.randomBasicStandardOrGold;
import static org.elasticsearch.license.XPackLicenseStateTests.randomTrialBasicStandardGoldOrPlatinumMode;
import static org.elasticsearch.license.XPackLicenseStateTests.randomTrialOrPlatinumMode;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.IsEqual.equalTo;
public class SqlLicenseIT extends AbstractLicensesIntegrationTestCase {
@ -40,6 +66,24 @@ public class SqlLicenseIT extends AbstractLicensesIntegrationTestCase {
enableJdbcLicensing();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
// Add Netty so we can test JDBC licensing because only exists on the REST layer.
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(Netty4Plugin.class);
return plugins;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
// Enable http so we can test JDBC licensing because only exists on the REST layer.
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(NetworkModule.HTTP_ENABLED.getKey(), true)
.put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
.build();
}
private static OperationMode randomValidSqlLicenseType() {
return randomTrialBasicStandardGoldOrPlatinumMode();
}
@ -112,6 +156,43 @@ public class SqlLicenseIT extends AbstractLicensesIntegrationTestCase {
assertThat(fetchSource.includes(), Matchers.arrayContaining("data"));
}
public void testJdbcActionLicense() throws Exception {
setupTestIndex();
disableJdbcLicensing();
Request request = new MetaTableRequest("test");
Response response = jdbc(request);
assertThat(response, instanceOf(ErrorResponse.class));
ErrorResponse er = (ErrorResponse) response;
assertEquals(ElasticsearchSecurityException.class.getName(), er.cause);
assertEquals("current license is non-compliant for [jdbc]", er.message);
enableJdbcLicensing();
response = jdbc(request);
assertThat(response, instanceOf(MetaTableResponse.class));
}
private Response jdbc(Request request) throws IOException {
// Convert the request to the HTTP entity that JDBC uses
HttpEntity entity;
try (BytesStreamOutput bytes = new BytesStreamOutput()) {
DataOutput out = new DataOutputStream(bytes);
Proto.INSTANCE.writeRequest(request, out);
entity = new ByteArrayEntity(BytesRef.deepCopyOf(bytes.bytes().toBytesRef()).bytes, ContentType.APPLICATION_JSON);
}
// Execute
InputStream response = getRestClient().performRequest("POST", "/_sql/jdbc", emptyMap(), entity).getEntity().getContent();
// Deserialize bytes to response like JDBC does
try {
DataInput in = new DataInputStream(response);
return Proto.INSTANCE.readResponse(request, in);
} finally {
response.close();
}
}
// TODO test SqlGetIndicesAction. Skipping for now because of lack of serialization support.
private void setupTestIndex() {
@ -122,4 +203,5 @@ public class SqlLicenseIT extends AbstractLicensesIntegrationTestCase {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}
}

View File

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.qa.sql.multinode;
import org.elasticsearch.xpack.qa.sql.jdbc.FetchSizeTestCase;
public class JdbcFetchSizeIT extends FetchSizeTestCase {
}

View File

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.qa.sql.nosecurity;
import org.elasticsearch.xpack.qa.sql.jdbc.FetchSizeTestCase;
public class JdbcFetchSizeIT extends FetchSizeTestCase {
}

View File

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.qa.sql.security;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.qa.sql.jdbc.FetchSizeTestCase;
import java.util.Properties;
public class JdbcFetchSizeIT extends FetchSizeTestCase {
@Override
protected Settings restClientSettings() {
return RestSqlIT.securitySettings();
}
@Override
protected Properties connectionProperties() {
return JdbcConnectionIT.securityProperties();
}
}

View File

@ -6,12 +6,11 @@
package org.elasticsearch.xpack.qa.sql.embed;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
/**
* Internal server used for testing without starting a new Elasticsearch instance.
*/
public class CliHttpServer extends ProtoHttpServer<BytesReference> {
public class CliHttpServer extends ProtoHttpServer {
public CliHttpServer(Client client) {
super(client, new CliProtoHandler(client), "/_sql/cli");
}

View File

@ -8,29 +8,31 @@ package org.elasticsearch.xpack.qa.sql.embed;
import com.sun.net.httpserver.HttpExchange;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.test.rest.FakeRestChannel;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.plugin.RestSqlCliAction;
import org.elasticsearch.xpack.sql.session.Cursor;
import java.io.DataInput;
import java.io.IOException;
class CliProtoHandler extends ProtoHandler<BytesReference> {
private final NamedWriteableRegistry cursorRegistry = new NamedWriteableRegistry(Cursor.getNamedWriteables());
import static org.mockito.Mockito.mock;
class CliProtoHandler extends ProtoHandler {
private final RestSqlCliAction action;
CliProtoHandler(Client client) {
super(new EmbeddedModeFilterClient(client, planExecutor(client)), r -> r);
super(new EmbeddedModeFilterClient(client, planExecutor(client)));
action = new RestSqlCliAction(Settings.EMPTY, mock(RestController.class));
}
@Override
protected void handle(HttpExchange http, DataInput in) throws IOException {
FakeRestChannel channel = new FakeRestChannel(new FakeRestRequest(), true, 1);
try {
RestSqlCliAction.operation(cursorRegistry, Proto.INSTANCE.readRequest(in), client).accept(channel);
action.operation(Proto.INSTANCE.readRequest(in), client).accept(channel);
while (false == channel.await()) {}
sendHttpResponse(http, channel.capturedResponse().content());
} catch (Exception e) {

View File

@ -10,9 +10,14 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
@ -38,6 +43,12 @@ public class EmbeddedModeFilterClient extends FilterClient {
Request request, ActionListener<Response> listener) {
if (action == SqlAction.INSTANCE) {
TransportSqlAction.operation(planExecutor, (SqlRequest) request, (ActionListener<SqlResponse>) listener);
} else if (action == SqlGetIndicesAction.INSTANCE) {
admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(response -> {
SqlGetIndicesAction.operation(new IndexNameExpressionResolver(Settings.EMPTY), EsCatalog::new,
(SqlGetIndicesAction.Request) request, response.getState(),
(ActionListener<SqlGetIndicesAction.Response>) listener);
}, listener::onFailure));
} else {
super.doExecute(action, request, listener);
}

View File

@ -6,15 +6,14 @@
package org.elasticsearch.xpack.qa.sql.embed;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
/**
* Internal server used for testing without starting a new Elasticsearch instance.
*/
public class JdbcHttpServer extends ProtoHttpServer<Response> {
public class JdbcHttpServer extends ProtoHttpServer {
public JdbcHttpServer(Client client) {
super(client, new JdbcProtoHandler(client), "/_jdbc");
super(client, new JdbcProtoHandler(client), "/_sql/jdbc");
}
@Override

View File

@ -8,31 +8,36 @@ package org.elasticsearch.xpack.qa.sql.embed;
import com.sun.net.httpserver.HttpExchange;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.test.rest.FakeRestChannel;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer;
import org.elasticsearch.xpack.sql.plugin.jdbc.JdbcServer;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.plugin.RestSqlJdbcAction;
import org.elasticsearch.xpack.sql.plugin.SqlLicenseChecker;
import java.io.DataInput;
import java.io.IOException;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.mockito.Mockito.mock;
class JdbcProtoHandler extends ProtoHandler<Response> {
private final JdbcServer server;
class JdbcProtoHandler extends ProtoHandler {
private final RestSqlJdbcAction action;
JdbcProtoHandler(Client client) {
super(client, response -> AbstractSqlServer.write(AbstractProto.CURRENT_VERSION, response));
this.server = new JdbcServer(planExecutor(client), clusterName, () -> info.getNode().getName(), info.getVersion(),
info.getBuild());
super(new EmbeddedModeFilterClient(client, planExecutor(client)));
action = new RestSqlJdbcAction(Settings.EMPTY, mock(RestController.class), new SqlLicenseChecker(() -> {}, () -> {}));
}
@Override
protected void handle(HttpExchange http, DataInput in) throws IOException {
Request req = Proto.INSTANCE.readRequest(in);
server.handle(req, wrap(resp -> sendHttpResponse(http, resp), ex -> fail(http, ex)));
FakeRestChannel channel = new FakeRestChannel(new FakeRestRequest(), true, 1);
try {
action.operation(Proto.INSTANCE.readRequest(in), client).accept(channel);
while (false == channel.await()) {}
sendHttpResponse(http, channel.capturedResponse().content());
} catch (Exception e) {
fail(http, e);
}
}
}

View File

@ -9,56 +9,43 @@ import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
public abstract class ProtoHandler<R> implements HttpHandler, AutoCloseable {
public abstract class ProtoHandler implements HttpHandler, AutoCloseable {
protected static PlanExecutor planExecutor(Client client) {
Supplier<ClusterState> clusterStateSupplier =
() -> client.admin().cluster().prepareState().get(timeValueMinutes(1)).getState();
BiConsumer<SqlGetIndicesAction.Request, ActionListener<SqlGetIndicesAction.Response>> getIndices = (request, listener) -> {
ClusterState state = clusterStateSupplier.get();
SqlGetIndicesAction.operation(new IndexNameExpressionResolver(Settings.EMPTY), EsCatalog::new,
request, state, listener);
};
return new PlanExecutor(client, clusterStateSupplier, getIndices, EsCatalog::new);
return new PlanExecutor(client, clusterStateSupplier, EsCatalog::new);
}
protected static final Logger log = ESLoggerFactory.getLogger(ProtoHandler.class.getName());
private final TimeValue TV = TimeValue.timeValueSeconds(5);
protected final Client client;
protected final NodeInfo info;
protected final String clusterName;
private final CheckedFunction<R, BytesReference, IOException> toProto;
protected ProtoHandler(Client client, CheckedFunction<R, BytesReference, IOException> toProto) {
protected ProtoHandler(Client client) {
NodesInfoResponse niResponse = client.admin().cluster().prepareNodesInfo("_local").clear().get(TV);
this.client = client;
info = niResponse.getNodes().get(0);
clusterName = niResponse.getClusterName().value();
this.toProto = toProto;
}
@Override
@ -80,13 +67,12 @@ public abstract class ProtoHandler<R> implements HttpHandler, AutoCloseable {
protected abstract void handle(HttpExchange http, DataInput in) throws IOException;
protected void sendHttpResponse(HttpExchange http, R response) throws IOException {
protected void sendHttpResponse(HttpExchange http, BytesReference response) throws IOException {
// first do the conversion in case an exception is triggered
BytesReference data = toProto.apply(response);
if (http.getResponseHeaders().isEmpty()) {
http.sendResponseHeaders(RestStatus.OK.getStatus(), 0);
}
data.writeTo(http.getResponseBody());
response.writeTo(http.getResponseBody());
http.close();
}

View File

@ -17,15 +17,15 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public abstract class ProtoHttpServer<R> {
public abstract class ProtoHttpServer {
private final ProtoHandler<R> handler;
private final ProtoHandler handler;
private final String protoSuffix;
private final Client client;
private HttpServer server;
private ExecutorService executor;
public ProtoHttpServer(Client client, ProtoHandler<R> handler, String protoSuffix) {
public ProtoHttpServer(Client client, ProtoHandler handler, String protoSuffix) {
this.client = client;
this.handler = handler;
this.protoSuffix = protoSuffix;

View File

@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.qa.sql.jdbc;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.junit.Before;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import static java.util.Collections.singletonMap;
/**
* Tests for setting {@link Statement#setFetchSize(int)} and
* {@link ResultSet#getFetchSize()}.
*/
public class FetchSizeTestCase extends JdbcIntegrationTestCase {
@Before
public void createTestIndex() throws IOException {
StringBuilder bulk = new StringBuilder();
for (int i = 0; i < 20; i++) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"test_field\":" + i + "}\n");
}
client().performRequest("PUT", "/test/doc/_bulk", singletonMap("refresh", "true"),
new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON));
}
/**
* Test for {@code SELECT} that is implemented as a scroll query.
* In this case the fetch size should be entirely respected.
*/
public void testScroll() throws SQLException {
try (Connection c = esJdbc();
Statement s = c.createStatement()) {
s.setFetchSize(4);
try (ResultSet rs = s.executeQuery("SELECT * FROM test ORDER BY test_field ASC")) {
for (int i = 0; i < 20; i++) {
assertEquals(4, rs.getFetchSize());
assertTrue(rs.next());
assertEquals(i, rs.getInt(1));
}
assertFalse(rs.next());
}
}
}
/**
* Test for {@code SELECT} that is implemented as an aggregation.
* In this case the fetch size should be entirely ignored.
*/
public void testAggregation() throws SQLException {
try (Connection c = esJdbc();
Statement s = c.createStatement()) {
s.setFetchSize(4);
try (ResultSet rs = s.executeQuery("SELECT test_field, COUNT(*) FROM test GROUP BY test_field")) {
for (int i = 0; i < 20; i++) {
assertEquals(20, rs.getFetchSize());
assertTrue(rs.next());
assertEquals(i, rs.getInt(1));
}
assertFalse(rs.next());
}
}
}
}

View File

@ -1263,5 +1263,10 @@ class JdbcDatabaseMetaData implements DatabaseMetaData, JdbcWrapper {
public Object column(int column) {
return data[row][column];
}
@Override
public int batchSize() {
return data.length;
}
}
}

View File

@ -398,8 +398,15 @@ class JdbcResultSet implements ResultSet, JdbcWrapper {
@Override
public int getFetchSize() throws SQLException {
/*
* Instead of returning the fetch size the user requested we make a
* stab at returning the fetch size that we actually used, returning
* the batch size of the current row. This allows us to assert these
* batch sizes in testing and lets us point users to something that
* they can use for debugging.
*/
checkOpen();
return (statement != null ? statement.getFetchSize() : 0);
return cursor.batchSize();
}
@Override

View File

@ -212,6 +212,7 @@ class JdbcStatement implements Statement, JdbcWrapper {
@Override
public int getFetchSize() throws SQLException {
checkOpen();
// NOCOMMIT this will return a bad value because we use -1 to mean "default" but default is something defined in connection string
return requestMeta.fetchSize();
}

View File

@ -21,4 +21,10 @@ public interface Cursor {
boolean next() throws SQLException;
Object column(int column);
/**
* Number of rows that this cursor has pulled back from the
* server in the current batch.
*/
int batchSize();
}

View File

@ -52,4 +52,9 @@ class DefaultCursor implements Cursor {
public Object column(int column) {
return page.entry(row, column);
}
@Override
public int batchSize() {
return page.rows();
}
}

View File

@ -59,7 +59,7 @@ class HttpClient {
}
BytesArray put(CheckedConsumer<DataOutput, IOException> os) throws SQLException {
return put("_jdbc?error_trace=true", os); // NOCOMMIT Do something with the error trace. Useful for filing bugs and debugging.
return put("_sql/jdbc?error_trace=true", os); // NOCOMMIT Do something with the error trace. Useful for filing bugs and debugging.
}
BytesArray put(String path, CheckedConsumer<DataOutput, IOException> os) throws SQLException { // NOCOMMIT remove path?

View File

@ -11,7 +11,7 @@ public class RequestMeta {
private long timeoutInMs;
public RequestMeta() {
this(0, 0);
this(-1, 0);
}
public RequestMeta(int fetchSize, int timeout) {

View File

@ -19,25 +19,17 @@ import org.elasticsearch.xpack.sql.parser.SqlParser;
import org.elasticsearch.xpack.sql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.sql.planner.Planner;
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.session.SqlSettings;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
public class PlanExecutor {
private final Client client;
private final Supplier<ClusterState> stateSupplier;
/**
* The way that we resolve indices asynchronously. This must
* be passed in to support embedded mode. Otherwise we could
* use the {@link #client} directly.
*/
private final BiConsumer<SqlGetIndicesAction.Request, ActionListener<SqlGetIndicesAction.Response>> getIndices;
private final Function<ClusterState, Catalog> catalogSupplier;
private final SqlParser parser;
@ -46,11 +38,9 @@ public class PlanExecutor {
private final Planner planner;
public PlanExecutor(Client client, Supplier<ClusterState> stateSupplier,
BiConsumer<SqlGetIndicesAction.Request, ActionListener<SqlGetIndicesAction.Response>> getIndices,
Function<ClusterState, Catalog> catalogSupplier) {
this.client = client;
this.stateSupplier = stateSupplier;
this.getIndices = getIndices;
this.catalogSupplier = catalogSupplier;
this.parser = new SqlParser();
@ -60,7 +50,7 @@ public class PlanExecutor {
}
public SqlSession newSession(SqlSettings settings) {
return new SqlSession(settings, client, getIndices, catalogSupplier.apply(stateSupplier.get()), parser,
return new SqlSession(settings, client, catalogSupplier.apply(stateSupplier.get()), parser,
functionRegistry, optimizer, planner);
}

View File

@ -0,0 +1,138 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.xpack.sql.analysis.AnalysisException;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.parser.ParsingException;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractErrorResponse;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractExceptionResponse;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.session.Cursor;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.elasticsearch.xpack.sql.util.StringUtils.EMPTY;
public abstract class AbstractSqlProtocolRestAction extends BaseRestHandler {
protected static final NamedWriteableRegistry CURSOR_REGISTRY = new NamedWriteableRegistry(Cursor.getNamedWriteables());
private final AbstractProto proto;
protected AbstractSqlProtocolRestAction(Settings settings, AbstractProto proto) {
super(settings);
this.proto = proto;
}
protected abstract RestChannelConsumer innerPrepareRequest(Request request, Client client) throws IOException;
protected abstract AbstractExceptionResponse buildExceptionResponse(Request request, String message, String cause,
SqlExceptionType exceptionType);
protected abstract AbstractErrorResponse buildErrorResponse(Request request, String message, String cause, String stack);
protected <T> ActionListener<T> toActionListener(Request request, RestChannel channel, Function<T, Response> responseBuilder) {
return new ActionListener<T>() {
@Override
public void onResponse(T response) {
try {
sendResponse(channel, responseBuilder.apply(response));
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
try {
sendResponse(channel, exceptionResponse(request, e));
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error("failed to send failure response", inner);
}
}
};
}
@Override
protected final RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Request request;
try (DataInputStream in = new DataInputStream(restRequest.content().streamInput())) {
request = proto.readRequest(in);
}
try {
return innerPrepareRequest(request, client);
} catch (Exception e) {
return channel -> sendResponse(channel, exceptionResponse(request, e));
}
}
private Response exceptionResponse(Request request, Exception e) {
// TODO I wonder why we don't just teach the servers to handle ES's normal exception response.....
SqlExceptionType exceptionType = sqlExceptionType(e);
String message = EMPTY;
String cs = EMPTY;
if (Strings.hasText(e.getMessage())) {
message = e.getMessage();
}
cs = e.getClass().getName();
if (exceptionType != null) {
return buildExceptionResponse(request, message, cs, exceptionType);
} else {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
return buildErrorResponse(request, message, cs, sw.toString());
}
}
private static SqlExceptionType sqlExceptionType(Throwable cause) {
if (cause instanceof AnalysisException || cause instanceof ResourceNotFoundException) {
return SqlExceptionType.DATA;
}
if (cause instanceof ParsingException) {
return SqlExceptionType.SYNTAX;
}
if (cause instanceof TimeoutException) {
return SqlExceptionType.TIMEOUT;
}
return null;
}
private void sendResponse(RestChannel channel, Response response) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) {
// NOCOMMIT use the version from the client
proto.writeResponse(response, Proto.CURRENT_VERSION, dataOutputStream);
}
channel.sendResponse(new BytesRestResponse(OK, TEXT_CONTENT_TYPE, bytesStreamOutput.bytes()));
}
}
}

View File

@ -61,14 +61,14 @@ public class RestSqlCliAction extends BaseRestHandler {
try (DataInputStream in = new DataInputStream(restRequest.content().streamInput())) {
request = Proto.INSTANCE.readRequest(in);
}
Consumer<RestChannel> consumer = operation(cursorRegistry, request, client);
Consumer<RestChannel> consumer = operation(request, client);
return consumer::accept;
}
/**
* Actual implementation of the operation
*/
public static Consumer<RestChannel> operation(NamedWriteableRegistry cursorRegistry, Request request, Client client)
public Consumer<RestChannel> operation(Request request, Client client)
throws IOException {
RequestType requestType = (RequestType) request.requestType();
switch (requestType) {
@ -80,7 +80,7 @@ public class RestSqlCliAction extends BaseRestHandler {
case QUERY_INIT:
return queryInit(client, (QueryInitRequest) request);
case QUERY_PAGE:
return queryPage(cursorRegistry, client, (QueryPageRequest) request);
return queryPage(client, (QueryPageRequest) request);
default:
throw new IllegalArgumentException("Unsupported action [" + requestType + "]");
}
@ -91,13 +91,13 @@ public class RestSqlCliAction extends BaseRestHandler {
SqlRequest sqlRequest = new SqlRequest(request.query, DateTimeZone.forTimeZone(request.timeZone), request.fetchSize, Cursor.EMPTY);
long start = System.nanoTime();
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> {
CliFormatter formatter = new CliFormatter(response);
String data = formatter.formatWithHeader(response);
return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data);
CliFormatter formatter = new CliFormatter(response);
String data = formatter.formatWithHeader(response);
return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data);
}));
}
private static Consumer<RestChannel> queryPage(NamedWriteableRegistry cursorRegistry, Client client, QueryPageRequest request) {
private Consumer<RestChannel> queryPage(Client client, QueryPageRequest request) {
Cursor cursor;
CliFormatter formatter;
try (StreamInput in = new NamedWriteableAwareStreamInput(new BytesArray(request.cursor).streamInput(), cursorRegistry)) {
@ -109,8 +109,8 @@ public class RestSqlCliAction extends BaseRestHandler {
SqlRequest sqlRequest = new SqlRequest("", SqlRequest.DEFAULT_TIME_ZONE, -1, cursor);
long start = System.nanoTime();
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> {
String data = formatter.formatWithoutHeader(response);
return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data);
String data = formatter.formatWithoutHeader(response);
return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), formatter), data);
}));
}

View File

@ -0,0 +1,199 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.main.MainAction;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ColumnInfo;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnInfo;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageResponse;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractExceptionResponse;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.util.StringUtils;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.common.Strings.hasText;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
private final SqlLicenseChecker sqlLicenseChecker;
public RestSqlJdbcAction(Settings settings, RestController controller, SqlLicenseChecker sqlLicenseChecker) {
super(settings, Proto.INSTANCE);
controller.registerHandler(POST, "/_sql/jdbc", this);
this.sqlLicenseChecker = sqlLicenseChecker;
}
@Override
public String getName() {
return "xpack_sql_jdbc_action";
}
@Override
protected RestChannelConsumer innerPrepareRequest(Request request, Client client)
throws IOException {
Consumer<RestChannel> consumer = operation(request, client);
return consumer::accept;
}
@Override
protected ErrorResponse buildErrorResponse(Request request, String message, String cause, String stack) {
return new ErrorResponse((RequestType) request.requestType(), message, cause, stack);
}
@Override
protected AbstractExceptionResponse buildExceptionResponse(Request request, String message, String cause,
SqlExceptionType exceptionType) {
return new ExceptionResponse((RequestType) request.requestType(), message, cause, exceptionType);
}
/**
* Actual implementation of the operation
*/
public Consumer<RestChannel> operation(Request request, Client client) throws IOException {
sqlLicenseChecker.checkIfJdbcAllowed();
RequestType requestType = (RequestType) request.requestType();
switch (requestType) {
case INFO:
return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), toActionListener(request, channel, response ->
new InfoResponse(response.getNodeName(), response.getClusterName().value(),
response.getVersion().major, response.getVersion().minor, response.getVersion().toString(),
response.getBuild().shortHash(), response.getBuild().date())));
case META_TABLE:
return metaTable(client, (MetaTableRequest) request);
case META_COLUMN:
return metaColumn(client, (MetaColumnRequest) request);
case QUERY_INIT:
return queryInit(client, (QueryInitRequest) request);
case QUERY_PAGE:
return queryPage(client, (QueryPageRequest) request);
default:
throw new IllegalArgumentException("Unsupported action [" + requestType + "]");
}
}
private Consumer<RestChannel> metaTable(Client client, MetaTableRequest request) {
String indexPattern = hasText(request.pattern()) ? StringUtils.jdbcToEsPattern(request.pattern()) : "*";
SqlGetIndicesAction.Request getRequest = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), indexPattern);
getRequest.local(true); // TODO serialization not supported by get indices action
return channel -> client.execute(SqlGetIndicesAction.INSTANCE, getRequest, toActionListener(request, channel, response -> {
return new MetaTableResponse(response.indices().stream()
.map(EsIndex::name)
.collect(toList()));
}));
}
private Consumer<RestChannel> metaColumn(Client client, MetaColumnRequest request) {
String indexPattern = Strings.hasText(request.tablePattern()) ? StringUtils.jdbcToEsPattern(request.tablePattern()) : "*";
Pattern columnMatcher = hasText(request.columnPattern()) ? StringUtils.likeRegex(request.columnPattern()) : null;
SqlGetIndicesAction.Request getRequest = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), indexPattern);
getRequest.local(true); // TODO serialization not supported by get indices action
return channel -> client.execute(SqlGetIndicesAction.INSTANCE, getRequest, toActionListener(request, channel, response -> {
List<MetaColumnInfo> columns = new ArrayList<>();
for (EsIndex esIndex : response.indices()) {
int pos = 0;
for (Map.Entry<String, DataType> entry : esIndex.mapping().entrySet()) {
pos++;
String name = entry.getKey();
if (columnMatcher == null || columnMatcher.matcher(name).matches()) {
DataType type = entry.getValue();
columns.add(new MetaColumnInfo(esIndex.name(), name, type.sqlType(), type.precision(), pos));
}
}
}
return new MetaColumnResponse(columns);
}));
}
private Consumer<RestChannel> queryInit(Client client, QueryInitRequest request) {
SqlRequest sqlRequest = new SqlRequest(request.query, SqlRequest.DEFAULT_TIME_ZONE, request.fetchSize, Cursor.EMPTY);
sqlRequest.timeZone(DateTimeZone.forTimeZone(request.timeZone));
long start = System.nanoTime();
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> {
List<JDBCType> types = new ArrayList<>(response.columns().size());
List<ColumnInfo> columns = new ArrayList<>(response.columns().size());
for (SqlResponse.ColumnInfo info : response.columns()) {
types.add(info.jdbcType());
columns.add(new ColumnInfo(info.name(), info.jdbcType(), "", "", "", ""));
}
return new QueryInitResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types), columns,
new SqlResponsePayload(types, response.rows()));
}));
}
private Consumer<RestChannel> queryPage(Client client, QueryPageRequest request) {
Cursor cursor;
List<JDBCType> types;
try (StreamInput in = new NamedWriteableAwareStreamInput(new BytesArray(request.cursor).streamInput(), CURSOR_REGISTRY)) {
cursor = in.readNamedWriteable(Cursor.class);
types = in.readList(r -> JDBCType.valueOf(r.readVInt()));
} catch (IOException e) {
throw new IllegalArgumentException("error reading the cursor");
}
SqlRequest sqlRequest = new SqlRequest("", SqlRequest.DEFAULT_TIME_ZONE, 0, cursor);
long start = System.nanoTime();
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> {
return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types),
new SqlResponsePayload(types, response.rows()));
}));
}
private static byte[] serializeCursor(Cursor cursor, List<JDBCType> types) {
if (cursor == Cursor.EMPTY) {
return new byte[0];
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeNamedWriteable(cursor);
out.writeVInt(types.size());
for (JDBCType type : types) {
out.writeVInt(type.getVendorTypeNumber());
}
return BytesRef.deepCopyOf(out.bytes().toBytesRef()).bytes;
} catch (IOException e) {
throw new RuntimeException("unexpected trouble building the cursor", e);
}
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
@ -27,9 +26,6 @@ import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
import org.elasticsearch.xpack.sql.analysis.catalog.FilteredCatalog;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction.TransportAction.CatalogHolder;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcHttpHandler;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.TransportJdbcAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlTranslateAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.TransportSqlAction;
@ -40,7 +36,6 @@ import org.elasticsearch.xpack.sql.session.Cursor;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
@ -65,13 +60,10 @@ public class SqlPlugin implements ActionPlugin {
if (catalogFilter != null) {
catalog = catalog.andThen(c -> new FilteredCatalog(c, catalogFilter));
}
BiConsumer<SqlGetIndicesAction.Request, ActionListener<SqlGetIndicesAction.Response>> getIndices = (request, listener) -> {
client.execute(SqlGetIndicesAction.INSTANCE, request, listener);
};
return Arrays.asList(
new CatalogHolder(catalog),
sqlLicenseChecker,
new PlanExecutor(client, clusterService::state, getIndices, catalog));
new PlanExecutor(client, clusterService::state, catalog));
}
@Override
@ -81,13 +73,12 @@ public class SqlPlugin implements ActionPlugin {
return Arrays.asList(new RestSqlAction(settings, restController),
new RestSqlCliAction(settings, restController),
new JdbcHttpHandler(settings, restController));
new RestSqlJdbcAction(settings, restController, sqlLicenseChecker));
}
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(new ActionHandler<>(SqlAction.INSTANCE, TransportSqlAction.class),
new ActionHandler<>(JdbcAction.INSTANCE, TransportJdbcAction.class),
new ActionHandler<>(SqlGetIndicesAction.INSTANCE, SqlGetIndicesAction.TransportAction.class),
new ActionHandler<>(SqlTranslateAction.INSTANCE, TransportSqlTranslateAction.class));
}

View File

@ -3,25 +3,30 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.jdbc;
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Payload;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ProtoUtils;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataOutput;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.type.DataType;
import org.joda.time.ReadableInstant;
import java.io.IOException;
import java.sql.JDBCType;
import java.util.List;
public class RowSetPayload implements Payload {
private final RowSet rowSet;
/**
* Implementation {@link Payload} that adapts it to data from
* {@link SqlResponse}.
*/
class SqlResponsePayload implements Payload {
private final List<JDBCType> typeLookup;
private final List<List<Object>> rows;
public RowSetPayload(RowSet rowSet) {
this.rowSet = rowSet;
SqlResponsePayload(List<JDBCType> typeLookup, List<List<Object>> rows) {
this.typeLookup = typeLookup;
this.rows = rows;
}
@Override
@ -31,20 +36,21 @@ public class RowSetPayload implements Payload {
@Override
public void writeTo(SqlDataOutput out) throws IOException {
out.writeInt(rowSet.size());
List<DataType> types = rowSet.schema().types();
out.writeInt(rows.size());
// unroll forEach manually to avoid a Consumer + try/catch for each value...
for (boolean hasRows = rowSet.hasCurrentRow(); hasRows; hasRows = rowSet.advanceRow()) {
for (int i = 0; i < rowSet.rowSize(); i++) {
Object value = rowSet.column(i);
for (List<Object> row : rows) {
for (int c = 0; c < row.size(); c++) {
JDBCType type = typeLookup.get(c);
Object value = row.get(c);
// unpack Joda classes on the server-side to not 'pollute' the common project and thus the client
if (types.get(i).sqlType() == JDBCType.TIMESTAMP && value instanceof ReadableInstant) {
if (type == JDBCType.TIMESTAMP && value instanceof ReadableInstant) {
// NOCOMMIT feels like a hack that'd be better cleaned up another way.
value = ((ReadableInstant) value).getMillis();
}
ProtoUtils.writeValue(out, value, types.get(i).sqlType());
ProtoUtils.writeValue(out, value, type);
}
}
}
}

View File

@ -1,160 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.jdbc;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ColumnInfo;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnInfo;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.session.SqlSettings;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.common.Strings.hasText;
import static org.elasticsearch.xpack.sql.util.StringUtils.EMPTY;
public class JdbcServer extends AbstractSqlServer {
private final PlanExecutor executor;
private final Supplier<InfoResponse> infoResponse;
public JdbcServer(PlanExecutor executor, String clusterName, Supplier<String> nodeName, Version version, Build build) {
this.executor = executor;
// Delay building the response until runtime because the node name is not available at startup
this.infoResponse = () -> new InfoResponse(nodeName.get(), clusterName, version.major, version.minor, version.toString(), build.shortHash(), build.date());
}
@Override
protected void innerHandle(Request req, ActionListener<Response> listener) {
RequestType requestType = (RequestType) req.requestType();
switch (requestType) {
case INFO:
listener.onResponse(info((InfoRequest) req));
break;
case META_TABLE:
metaTable((MetaTableRequest) req, listener);
break;
case META_COLUMN:
metaColumn((MetaColumnRequest) req, listener);
break;
case QUERY_INIT:
queryInit((QueryInitRequest) req, listener);
break;
case QUERY_PAGE:
queryPage((QueryPageRequest) req, listener);
break;
default:
throw new IllegalArgumentException("Unsupported action [" + requestType + "]");
}
}
@Override
protected ErrorResponse buildErrorResponse(Request request, String message, String cause, String stack) {
return new ErrorResponse((RequestType) request.requestType(), message, cause, stack);
}
@Override
protected ExceptionResponse buildExceptionResponse(Request request, String message, String cause,
SqlExceptionType exceptionType) {
return new ExceptionResponse((RequestType) request.requestType(), message, cause, exceptionType);
}
public InfoResponse info(InfoRequest req) {
return infoResponse.get();
}
public void metaTable(MetaTableRequest req, ActionListener<Response> listener) {
String indexPattern = hasText(req.pattern()) ? StringUtils.jdbcToEsPattern(req.pattern()) : "*";
executor.newSession(SqlSettings.EMPTY)
.getIndices(new String[] {indexPattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> {
listener.onResponse(new MetaTableResponse(result.stream()
.map(EsIndex::name)
.collect(toList())));
}, listener::onFailure));
}
public void metaColumn(MetaColumnRequest req, ActionListener<Response> listener) {
String pattern = Strings.hasText(req.tablePattern()) ? StringUtils.jdbcToEsPattern(req.tablePattern()) : "*";
Pattern columnMatcher = hasText(req.columnPattern()) ? StringUtils.likeRegex(req.columnPattern()) : null;
executor.newSession(SqlSettings.EMPTY)
.getIndices(new String[] {pattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> {
List<MetaColumnInfo> resp = new ArrayList<>();
for (EsIndex esIndex : result) {
int pos = 0;
for (Entry<String, DataType> entry : esIndex.mapping().entrySet()) {
pos++;
if (columnMatcher == null || columnMatcher.matcher(entry.getKey()).matches()) {
String name = entry.getKey();
String table = esIndex.name();
JDBCType tp = entry.getValue().sqlType();
int size = entry.getValue().precision();
resp.add(new MetaColumnInfo(table, name, tp, size, pos));
}
}
}
listener.onResponse(new MetaColumnResponse(resp));
}, listener::onFailure));
}
public void queryInit(QueryInitRequest req, ActionListener<Response> listener) {
final long start = System.nanoTime();
SqlSettings sqlCfg = new SqlSettings(Settings.builder()
.put(SqlSettings.PAGE_SIZE, req.fetchSize)
.put(SqlSettings.TIMEZONE_ID, req.timeZone.getID())
.build()
);
//NOCOMMIT: this should be pushed down to the TransportSqlAction to hook up pagination
executor.sql(sqlCfg, req.query, wrap(c -> {
long stop = System.nanoTime();
List<ColumnInfo> columnInfo = c.schema().stream()
.map(e -> new ColumnInfo(e.name(), e.type().sqlType(), EMPTY, EMPTY, EMPTY, EMPTY))
.collect(toList());
// NOCOMMIT paging for jdbc
listener.onResponse(new QueryInitResponse(stop - start, new byte[0], columnInfo, new RowSetPayload(c)));
}, ex -> listener.onResponse(exceptionResponse(req, ex))));
}
public void queryPage(QueryPageRequest req, ActionListener<Response> listener) {
throw new UnsupportedOperationException();
}
}

View File

@ -1,29 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
public class JdbcAction extends Action<JdbcRequest, JdbcResponse, JdbcRequestBuilder> {
public static final JdbcAction INSTANCE = new JdbcAction();
public static final String NAME = "indices:data/read/sql/jdbc";
private JdbcAction() {
super(NAME);
}
@Override
public JdbcRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new JdbcRequestBuilder(client, this);
}
@Override
public JdbcResponse newResponse() {
return new JdbcResponse();
}
}

View File

@ -1,75 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.io.DataInputStream;
import java.io.IOException;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.elasticsearch.rest.RestStatus.OK;
public class JdbcHttpHandler extends BaseRestHandler { // NOCOMMIT these are call RestJdbcAction even if it isn't REST.
public JdbcHttpHandler(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(POST, "/_jdbc", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!request.hasContent()) {
return badProto(StringUtils.EMPTY);
}
try (DataInputStream in = new DataInputStream(request.content().streamInput())) {
try {
return c -> client.executeLocally(JdbcAction.INSTANCE, new JdbcRequest(Proto.INSTANCE.readRequest(in)),
wrap(response -> c.sendResponse(new BytesRestResponse(OK, TEXT_CONTENT_TYPE, response.bytesReference())),
ex -> error(c, ex)));
} catch (Exception ex) {
return badProto("Unknown message");
}
}
}
private static RestChannelConsumer badProto(String message) {
return c -> c.sendResponse(new BytesRestResponse(BAD_REQUEST, TEXT_CONTENT_TYPE, message));
}
private void error(RestChannel channel, Exception ex) {
logger.debug("failed to parse sql request", ex);
BytesRestResponse response = null;
try {
response = new BytesRestResponse(channel, ex);
} catch (IOException e) {
response = new BytesRestResponse(OK, TEXT_CONTENT_TYPE, ExceptionsHelper.stackTrace(e));
}
channel.sendResponse(response);
}
@Override
public String getName() {
return "sql_jdbc_action";
}
}

View File

@ -1,118 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class JdbcRequest extends ActionRequest implements CompositeIndicesRequest {
private BytesReference bytesReference;
public JdbcRequest() {
}
public JdbcRequest(Request request) {
try {
request(request);
} catch (IOException ex) {
throw new IllegalArgumentException("cannot serialize the request", ex);
}
}
public JdbcRequest(BytesReference bytesReference) {
this.bytesReference = bytesReference;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (bytesReference == null) {
validationException = addValidationError("no request has been specified", validationException);
}
return validationException;
}
/**
* Gets the response object from internally stored serialized version
*/
public Request request() throws IOException {
try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) {
return Proto.INSTANCE.readRequest(in);
}
}
/**
* Converts the response object into internally stored serialized version
*/
public JdbcRequest request(Request request) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) {
Proto.INSTANCE.writeRequest(request, dataOutputStream);
}
bytesReference = bytesStreamOutput.bytes();
}
return this;
}
public BytesReference bytesReference() {
return bytesReference;
}
@Override
public int hashCode() {
return Objects.hash(bytesReference);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
JdbcRequest other = (JdbcRequest) obj;
return Objects.equals(bytesReference, other.bytesReference);
}
@Override
public String getDescription() {
try {
return "SQL JDBC [" + request() + "]";
} catch (IOException ex) {
return "SQL JDBC [" + ex.getMessage() + "]";
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
bytesReference = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(bytesReference);
}
}

View File

@ -1,28 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.IOException;
public class JdbcRequestBuilder extends ActionRequestBuilder<JdbcRequest, JdbcResponse, JdbcRequestBuilder> {
public JdbcRequestBuilder(ElasticsearchClient client, JdbcAction action) {
super(client, action, new JdbcRequest());
}
public JdbcRequestBuilder(ElasticsearchClient client, JdbcAction action, Request req) {
super(client, action, new JdbcRequest(req));
}
public JdbcRequestBuilder request(Request req) throws IOException {
request.request(req);
return this;
}
}

View File

@ -1,92 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Objects;
public class JdbcResponse extends ActionResponse {
private BytesReference bytesReference;
public JdbcResponse() {
}
public JdbcResponse(BytesReference bytesReference) {
this.bytesReference = bytesReference;
}
public JdbcResponse(Response response) {
try {
response(response);
} catch (IOException ex) {
throw new IllegalArgumentException("cannot serialize the request", ex);
}
}
/**
* Gets the response object from internally stored serialized version
*
* @param request the request that was used to generate this response
*/
public Response response(Request request) throws IOException {
try (DataInputStream in = new DataInputStream(bytesReference.streamInput())) {
return Proto.INSTANCE.readResponse(request, in);
}
}
/**
* Serialized the response object into internally stored serialized version
*/
public JdbcResponse response(Response response) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
try (DataOutputStream dataOutputStream = new DataOutputStream(bytesStreamOutput)) {
Proto.INSTANCE.writeResponse(response, Proto.CURRENT_VERSION, dataOutputStream);
}
bytesReference = bytesStreamOutput.bytes();
}
return this;
}
public BytesReference bytesReference() {
return bytesReference;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
bytesReference = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(bytesReference);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JdbcResponse that = (JdbcResponse) o;
return Objects.equals(bytesReference, that.bytesReference);
}
@Override
public int hashCode() {
return Objects.hash(bytesReference);
}
}

View File

@ -1,58 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.SqlLicenseChecker;
import org.elasticsearch.xpack.sql.plugin.jdbc.JdbcServer;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.util.ActionUtils.chain;
public class TransportJdbcAction extends HandledTransportAction<JdbcRequest, JdbcResponse> {
private final JdbcServer jdbcServer;
private final SqlLicenseChecker sqlLicenseChecker;
@Inject
public TransportJdbcAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
PlanExecutor planExecutor,
SqlLicenseChecker sqlLicenseChecker) {
super(settings, JdbcAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, JdbcRequest::new);
this.sqlLicenseChecker = sqlLicenseChecker;
this.jdbcServer = new JdbcServer(planExecutor, clusterService.getClusterName().value(),
() -> clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT);
}
@Override
protected void doExecute(JdbcRequest jdbcRequest, ActionListener<JdbcResponse> listener) {
sqlLicenseChecker.checkIfJdbcAllowed();
final Request request;
try {
request = jdbcRequest.request();
} catch (IOException ex) {
listener.onFailure(ex);
return;
}
// NOCOMMIT looks like this runs on the netty threadpool which might be bad. If we go async immediately it is ok, but we don't.
jdbcServer.handle(request, chain(listener, JdbcResponse::new));
}
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.sql.session.Cursor;
import java.io.IOException;
import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@ -167,29 +168,34 @@ public class SqlResponse extends ActionResponse implements ToXContentObject {
*/
public static final class ColumnInfo implements Writeable, ToXContentObject {
private final String name;
private final String type;
private final String esType;
private final JDBCType jdbcType;
public ColumnInfo(String name, String type) {
public ColumnInfo(String name, String esType, JDBCType jdbcType) {
this.name = name;
this.type = type;
this.esType = esType;
this.jdbcType = jdbcType;
}
ColumnInfo(StreamInput in) throws IOException {
name = in.readString();
type = in.readString();
esType = in.readString();
jdbcType = JDBCType.valueOf(in.readVInt());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(type);
out.writeString(esType);
out.writeVInt(jdbcType.getVendorTypeNumber());
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("name", name);
builder.field("type", type);
builder.field("type", esType);
// TODO include jdbc_type?
return builder.endObject();
}
@ -200,8 +206,18 @@ public class SqlResponse extends ActionResponse implements ToXContentObject {
return name;
}
public String type() {
return type;
/**
* The type of the column in Elasticsearch.
*/
public String esType() {
return esType;
}
/**
* The type of the column as it would be returned by a JDBC driver.
*/
public JDBCType jdbcType() {
return jdbcType;
}
@Override
@ -211,12 +227,13 @@ public class SqlResponse extends ActionResponse implements ToXContentObject {
}
ColumnInfo other = (ColumnInfo) obj;
return name.equals(other.name)
&& type.equals(other.type);
&& esType.equals(other.esType)
&& jdbcType.equals(other.jdbcType);
}
@Override
public int hashCode() {
return Objects.hash(name, type);
return Objects.hash(name, esType, jdbcType);
}
@Override

View File

@ -69,7 +69,7 @@ public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlRe
if (includeColumnMetadata) {
columns = new ArrayList<>(cursor.schema().types().size());
for (Schema.Entry entry : cursor.schema()) {
columns.add(new ColumnInfo(entry.name(), entry.type().esName()));
columns.add(new ColumnInfo(entry.name(), entry.type().esName(), entry.type().sqlType()));
}
columns = unmodifiableList(columns);
}

View File

@ -22,13 +22,11 @@ import org.elasticsearch.xpack.sql.planner.Planner;
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
public class SqlSession {
private final Client client;
private final BiConsumer<SqlGetIndicesAction.Request, ActionListener<SqlGetIndicesAction.Response>> sqlGetIndicesAction;
private final Catalog catalog;
private final SqlParser parser;
@ -48,16 +46,14 @@ public class SqlSession {
};
public SqlSession(SqlSession other) {
this(other.defaults(), other.client(), other.sqlGetIndicesAction, other.catalog(), other.parser,
this(other.defaults(), other.client(), other.catalog(), other.parser,
other.functionRegistry(), other.optimizer(), other.planner());
}
public SqlSession(SqlSettings defaults, Client client,
BiConsumer<SqlGetIndicesAction.Request, ActionListener<SqlGetIndicesAction.Response>> sqlGetIndicesAction,
Catalog catalog, SqlParser parser, FunctionRegistry functionRegistry, Optimizer optimizer,
Planner planner) {
this.client = client;
this.sqlGetIndicesAction = sqlGetIndicesAction;
this.catalog = catalog;
this.parser = parser;
@ -86,7 +82,7 @@ public class SqlSession {
*/
public void getIndices(String[] patterns, IndicesOptions options, ActionListener<List<EsIndex>> listener) {
SqlGetIndicesAction.Request request = new SqlGetIndicesAction.Request(options, patterns).local(true);
sqlGetIndicesAction.accept(request, ActionListener.wrap(response -> {
client.execute(SqlGetIndicesAction.INSTANCE, request, ActionListener.wrap(response -> {
listener.onResponse(response.indices());
}, listener::onFailure));
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse.ColumnInfo;
import org.elasticsearch.xpack.sql.session.Cursor;
import java.sql.JDBCType;
import java.util.Arrays;
import static org.hamcrest.Matchers.arrayWithSize;
@ -17,11 +18,11 @@ import static org.hamcrest.Matchers.arrayWithSize;
public class CliFormatterTests extends ESTestCase {
private final SqlResponse firstResponse = new SqlResponse(Cursor.EMPTY, 10, 5,
Arrays.asList(
new ColumnInfo("foo", "string"),
new ColumnInfo("bar", "long"),
new ColumnInfo("15charwidename!", "double"),
new ColumnInfo("superduperwidename!!!", "double"),
new ColumnInfo("baz", "keyword")),
new ColumnInfo("foo", "string", JDBCType.VARCHAR),
new ColumnInfo("bar", "long", JDBCType.BIGINT),
new ColumnInfo("15charwidename!", "double", JDBCType.DOUBLE),
new ColumnInfo("superduperwidename!!!", "double", JDBCType.DOUBLE),
new ColumnInfo("baz", "keyword", JDBCType.VARCHAR)),
Arrays.asList(
Arrays.asList("15charwidedata!", 1, 6.888, 12, "rabbit"),
Arrays.asList("dog", 1.7976931348623157E308, 123124.888, 9912, "goat")));

View File

@ -1,29 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcRequest;
public class JdbcRequestTests extends AbstractStreamableTestCase<JdbcRequest> {
@Override
protected JdbcRequest createTestInstance() {
if (randomBoolean()) {
return new JdbcRequest(new InfoRequest(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10),
randomAlphaOfLength(10), randomAlphaOfLength(10)));
} else {
return new JdbcRequest(new MetaTableRequest(randomAlphaOfLength(10)));
}
}
@Override
protected JdbcRequest createBlankInstance() {
return new JdbcRequest();
}
}

View File

@ -1,32 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcResponse;
import java.util.Collections;
public class JdbcResponseTests extends AbstractStreamableTestCase<JdbcResponse> {
@Override
protected JdbcResponse createTestInstance() {
if (randomBoolean()) {
return new JdbcResponse(new InfoResponse(randomAlphaOfLength(10), randomAlphaOfLength(10),
randomByte(), randomByte(),
randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10)));
} else {
return new JdbcResponse(new MetaTableResponse(Collections.singletonList(randomAlphaOfLength(10))));
}
}
@Override
protected JdbcResponse createBlankInstance() {
return new JdbcResponse();
}
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse.ColumnInfo;
import org.elasticsearch.xpack.sql.session.Cursor;
import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -35,7 +36,7 @@ public class SqlResponseTests extends AbstractStreamableTestCase<SqlResponse> {
if (randomBoolean()) {
columns = new ArrayList<>(columnCount);
for (int i = 0; i < columnCount; i++) {
columns.add(new ColumnInfo(randomAlphaOfLength(10), randomAlphaOfLength(10)));
columns.add(new ColumnInfo(randomAlphaOfLength(10), randomAlphaOfLength(10), randomFrom(JDBCType.values())));
}
}