Minor polishing

Make JDBC and CLI consistent with the rest of the plugin
Replace ResultPage with an interface

Original commit: elastic/x-pack-elasticsearch@c62249cc2e
This commit is contained in:
Costin Leau 2017-07-18 18:41:23 +03:00
parent cf29dea577
commit f33ae72712
31 changed files with 220 additions and 168 deletions

View File

@ -10,11 +10,11 @@ import com.sun.net.httpserver.HttpExchange;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.sql.TestUtils;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer;
import org.elasticsearch.xpack.sql.plugin.cli.CliServer;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.server.AbstractSqlServer;
import org.elasticsearch.xpack.sql.server.cli.CliServer;
import org.elasticsearch.xpack.sql.test.server.ProtoHandler;
import java.io.DataInput;
@ -28,8 +28,7 @@ class CliProtoHandler extends ProtoHandler<Response> {
CliProtoHandler(Client client) {
super(client, response -> AbstractSqlServer.write(AbstractProto.CURRENT_VERSION, response));
this.server = new CliServer(TestUtils.planExecutor(client), clusterName, () -> info.getNode().getName(), info.getVersion(),
info.getBuild());
this.server = new CliServer(TestUtils.planExecutor(client), clusterName, () -> info.getNode().getName(), info.getVersion(), info.getBuild());
}
@Override

View File

@ -13,10 +13,13 @@ import java.sql.JDBCType;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.ProtoUtils.classOf;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.ProtoUtils.readValue;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.ProtoUtils.writeValue;
/**
* Stores a page of data in a columnar format.
*/
public class Page extends ResultPage {
public class Page implements Payload {
private final List<ColumnInfo> columnInfo;
/**
@ -88,7 +91,7 @@ public class Page extends ResultPage {
/**
* Read a value from the stream
*/
void read(DataInput in) throws IOException {
public void read(DataInput in) throws IOException {
int rows = in.readInt();
// this.rows may be less than the number of rows we have space for
if (rows > maxRows) {
@ -103,7 +106,6 @@ public class Page extends ResultPage {
}
}
@Override
public void write(DataOutput out) throws IOException {
int rows = rows();
out.writeInt(rows);
@ -132,7 +134,7 @@ public class Page extends ResultPage {
@Override
public boolean equals(Object obj) {
if (obj == null || obj instanceof ResultPage == false) {
if (obj == null || obj instanceof Page == false) {
return false;
}
Page other = (Page) obj;

View File

@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public interface Payload {
void read(DataInput in) throws IOException;
void write(DataOutput out) throws IOException;
}

View File

@ -13,16 +13,10 @@ import java.sql.Blob;
import java.sql.Clob;
import java.sql.JDBCType;
/**
* Abstract base class for a page of results. The canonical implementation in {@link Page}
* and implementation must write usings the same format as {@linkplain Page}.
*/
public abstract class ResultPage {
public abstract void write(DataOutput out) throws IOException;
public class ProtoUtils {
// See Jdbc spec, appendix B
@SuppressWarnings("unchecked")
protected static <T> T readValue(DataInput in, JDBCType type) throws IOException {
public static <T> T readValue(DataInput in, JDBCType type) throws IOException {
// NOCOMMIT <T> feels slippery here
Object result;
byte hasNext = in.readByte();
@ -82,7 +76,7 @@ public abstract class ResultPage {
return (T) result;
}
protected static void writeValue(DataOutput out, Object o, JDBCType type) throws IOException {
public static void writeValue(DataOutput out, Object o, JDBCType type) throws IOException {
if (o == null) {
out.writeByte(0);
return;
@ -145,7 +139,7 @@ public abstract class ResultPage {
/**
* The type of the array used to store columns of this type.
*/
protected static Class<?> classOf(JDBCType jdbcType) {
public static Class<?> classOf(JDBCType jdbcType) {
switch (jdbcType) {
case NUMERIC:
case DECIMAL:
@ -163,7 +157,7 @@ public abstract class ResultPage {
case BIGINT:
return Long.class;
case REAL:
return Float.class;
return Float.class;
case FLOAT:
case DOUBLE:
return Double.class;
@ -187,4 +181,4 @@ public abstract class ResultPage {
throw new IllegalArgumentException("Unsupported JDBC type [" + jdbcType + "]");
}
}
}
}

View File

@ -73,4 +73,4 @@ public class QueryInitRequest extends Request {
public int hashCode() {
return Objects.hash(fetchSize, query, timeout, timeZone.getID().hashCode());
}
}
}

View File

@ -23,10 +23,9 @@ public class QueryInitResponse extends Response {
public final long serverTimeQueryReceived, serverTimeResponseSent;
public final String requestId;
public final List<ColumnInfo> columns;
public final ResultPage data;
public final Payload data;
public QueryInitResponse(long serverTimeQueryReceived, long serverTimeResponseSent, String requestId, List<ColumnInfo> columns,
ResultPage data) {
public QueryInitResponse(long serverTimeQueryReceived, long serverTimeResponseSent, String requestId, List<ColumnInfo> columns, Payload data) {
this.serverTimeQueryReceived = serverTimeQueryReceived;
this.serverTimeResponseSent = serverTimeResponseSent;
this.requestId = requestId;
@ -44,6 +43,7 @@ public class QueryInitResponse extends Response {
columns.add(new ColumnInfo(in));
}
this.columns = unmodifiableList(columns);
//NOCOMMIT - Page is a client class, it shouldn't leak here
Page data = new Page(columns);
data.read(in);
this.data = data;

View File

@ -17,9 +17,9 @@ import java.util.Objects;
public class QueryPageRequest extends Request {
public final String requestId;
public final TimeoutInfo timeout;
private final transient Page data;
private final transient Payload data;
public QueryPageRequest(String requestId, TimeoutInfo timeout, @Nullable Page data) {
public QueryPageRequest(String requestId, TimeoutInfo timeout, @Nullable Payload data) {
if (requestId == null) {
throw new IllegalArgumentException("[requestId] must not be null");
}
@ -43,7 +43,7 @@ public class QueryPageRequest extends Request {
timeout.write(out);
}
public Page data() {
public Payload data() {
return data;
}

View File

@ -17,9 +17,9 @@ import java.util.Objects;
public class QueryPageResponse extends Response {
public final String requestId;
private final ResultPage data;
private final Payload data;
public QueryPageResponse(String requestId, ResultPage data) {
public QueryPageResponse(String requestId, Page data) {
if (requestId == null) {
throw new IllegalArgumentException("[requestId] must not be null");
}

View File

@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc.framework;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.sql.jdbc.SqlSpecIT;
import java.net.URL;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
public class DataLoader {
public static void main(String[] args) throws Exception {
RestClient client = RestClient.builder(new HttpHost("localhost", 9200)).build();
loadDatasetIntoEs(client);
}
protected static void loadDatasetIntoEs(RestClient client) throws Exception {
XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
createIndex.startObject("settings");
{
createIndex.field("number_of_shards", 1);
}
createIndex.endObject();
createIndex.startObject("mappings");
{
createIndex.startObject("emp");
{
createIndex.startObject("properties");
{
createIndex.startObject("emp_no").field("type", "integer").endObject();
createIndex.startObject("birth_date").field("type", "date").endObject();
createIndex.startObject("first_name").field("type", "text").endObject();
createIndex.startObject("last_name").field("type", "text").endObject();
createIndex.startObject("gender").field("type", "keyword").endObject();
createIndex.startObject("hire_date").field("type", "date").endObject();
}
createIndex.endObject();
}
createIndex.endObject();
}
createIndex.endObject().endObject();
client.performRequest("PUT", "/test_emp", emptyMap(), new StringEntity(createIndex.string(), ContentType.APPLICATION_JSON));
StringBuilder bulk = new StringBuilder();
csvToLines("employees", (titles, fields) -> {
bulk.append("{\"index\":{}}\n");
bulk.append('{');
for (int f = 0; f < fields.size(); f++) {
if (f != 0) {
bulk.append(',');
}
bulk.append('"').append(titles.get(f)).append("\":\"").append(fields.get(f)).append('"');
}
bulk.append("}\n");
});
client.performRequest("POST", "/test_emp/emp/_bulk", singletonMap("refresh", "true"), new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON));
}
private static void csvToLines(String name, CheckedBiConsumer<List<String>, List<String>, Exception> consumeLine) throws Exception {
String location = "/" + name + ".csv";
URL dataSet = SqlSpecIT.class.getResource(location);
if (dataSet == null) {
throw new IllegalArgumentException("Can't find [" + location + "]");
}
List<String> lines = Files.readAllLines(PathUtils.get(dataSet.toURI()));
if (lines.isEmpty()) {
throw new IllegalArgumentException("[" + location + "] must contain at least a title row");
}
List<String> titles = Arrays.asList(lines.get(0).split(","));
for (int l = 1; l < lines.size(); l++) {
consumeLine.accept(titles, Arrays.asList(lines.get(l).split(",")));
}
}
}

View File

@ -10,15 +10,12 @@ import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.sql.jdbc.SqlSpecIT;
import org.junit.ClassRule;
import org.relique.io.TableReader;
import org.relique.jdbc.csv.CsvConnection;
@ -26,18 +23,14 @@ import org.relique.jdbc.csv.CsvConnection;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.net.URL;
import java.nio.file.Files;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.xpack.sql.jdbc.framework.JdbcAssert.assertResultSets;
@ -117,57 +110,6 @@ public abstract class JdbcIntegrationTestCase extends ESRestTestCase {
}
protected static void loadDatasetIntoEs() throws Exception {
XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
createIndex.startObject("settings"); {
createIndex.field("number_of_shards", 1);
}
createIndex.endObject();
createIndex.startObject("mappings"); {
createIndex.startObject("emp");
{
createIndex.startObject("properties"); {
createIndex.startObject("emp_no").field("type", "integer").endObject();
createIndex.startObject("birth_date").field("type", "date").endObject();
createIndex.startObject("first_name").field("type", "text").endObject();
createIndex.startObject("last_name").field("type", "text").endObject();
createIndex.startObject("gender").field("type", "keyword").endObject();
createIndex.startObject("hire_date").field("type", "date").endObject();
}
createIndex.endObject();
}
createIndex.endObject();
}
createIndex.endObject().endObject();
client().performRequest("PUT", "/test_emp", emptyMap(), new StringEntity(createIndex.string(), ContentType.APPLICATION_JSON));
StringBuilder bulk = new StringBuilder();
csvToLines("employees", (titles, fields) -> {
bulk.append("{\"index\":{}}\n");
bulk.append('{');
for (int f = 0; f < fields.size(); f++) {
if (f != 0) {
bulk.append(',');
}
bulk.append('"').append(titles.get(f)).append("\":\"").append(fields.get(f)).append('"');
}
bulk.append("}\n");
});
client().performRequest("POST", "/test_emp/emp/_bulk", singletonMap("refresh", "true"), new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON));
DataLoader.loadDatasetIntoEs(client());
}
private static void csvToLines(String name, CheckedBiConsumer<List<String>, List<String>, Exception> consumeLine) throws Exception {
String location = "/" + name + ".csv";
URL dataSet = SqlSpecIT.class.getResource(location);
if (dataSet == null) {
throw new IllegalArgumentException("Can't find [" + location + "]");
}
List<String> lines = Files.readAllLines(PathUtils.get(dataSet.toURI()));
if (lines.isEmpty()) {
throw new IllegalArgumentException("[" + location + "] must contain at least a title row");
}
List<String> titles = Arrays.asList(lines.get(0).split(","));
for (int l = 1; l < lines.size(); l++) {
consumeLine.accept(titles, Arrays.asList(lines.get(l).split(",")));
}
}
}
}

View File

@ -10,11 +10,11 @@ import com.sun.net.httpserver.HttpExchange;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.sql.TestUtils;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer;
import org.elasticsearch.xpack.sql.plugin.jdbc.JdbcServer;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.server.AbstractSqlServer;
import org.elasticsearch.xpack.sql.server.jdbc.JdbcServer;
import org.elasticsearch.xpack.sql.test.server.ProtoHandler;
import java.io.DataInput;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server;
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;

View File

@ -24,15 +24,15 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliHttpHandler;
import org.elasticsearch.xpack.sql.plugin.cli.action.TransportCliAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcHttpHandler;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.TransportJdbcAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.TransportSqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.rest.RestSqlAction;
import org.elasticsearch.xpack.sql.server.cli.CliAction;
import org.elasticsearch.xpack.sql.server.cli.CliHttpHandler;
import org.elasticsearch.xpack.sql.server.cli.TransportCliAction;
import org.elasticsearch.xpack.sql.server.jdbc.JdbcAction;
import org.elasticsearch.xpack.sql.server.jdbc.JdbcHttpHandler;
import org.elasticsearch.xpack.sql.server.jdbc.TransportJdbcAction;
import java.util.Arrays;
import java.util.Collection;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.cli;
package org.elasticsearch.xpack.sql.plugin.cli;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
@ -17,10 +17,10 @@ import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.execution.search.SearchHitRowSetCursor;
import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.server.AbstractSqlServer;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.util.TimeZone;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.cli;
package org.elasticsearch.xpack.sql.plugin.cli;
import java.util.concurrent.atomic.AtomicBoolean;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.cli;
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.cli;
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
@ -15,8 +15,8 @@ import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto;
import org.elasticsearch.xpack.sql.server.AbstractSqlServer;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.io.DataInputStream;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.cli;
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.cli;
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.cli;
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Response;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.cli;
package org.elasticsearch.xpack.sql.plugin.cli.action;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
@ -18,6 +18,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.cli.CliServer;
import static org.elasticsearch.xpack.sql.util.ActionUtils.chain;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.jdbc;
package org.elasticsearch.xpack.sql.plugin.jdbc;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
@ -26,10 +26,10 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto.SqlExceptionType;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import org.elasticsearch.xpack.sql.server.AbstractSqlServer;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.util.StringUtils;
@ -46,6 +46,7 @@ import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.common.Strings.coalesceToEmpty;
import static org.elasticsearch.common.Strings.hasText;
import static org.elasticsearch.common.Strings.tokenizeToStringArray;
import static org.elasticsearch.xpack.sql.util.StringUtils.EMPTY;
public class JdbcServer extends AbstractSqlServer {
private final PlanExecutor executor;
@ -74,7 +75,8 @@ public class JdbcServer extends AbstractSqlServer {
queryInit((QueryInitRequest) req, listener);
break;
case QUERY_PAGE:
// TODO implement me
queryPage((QueryPageRequest) req, listener);
break;
default:
throw new IllegalArgumentException("Unsupported action [" + requestType + "]");
}
@ -141,16 +143,16 @@ public class JdbcServer extends AbstractSqlServer {
executor.sql(req.query, req.timeZone, wrap(c -> {
long stop = System.currentTimeMillis();
String requestId = "";
String requestId = EMPTY;
if (c.hasNextSet() && c instanceof SearchHitRowSetCursor) {
requestId = StringUtils.nullAsEmpty(((SearchHitRowSetCursor) c).scrollId());
}
List<ColumnInfo> columnInfo = c.schema().stream()
.map(e -> new ColumnInfo(e.name(), e.type().sqlType(), "", "", "", ""))
.map(e -> new ColumnInfo(e.name(), e.type().sqlType(), EMPTY, EMPTY, EMPTY, EMPTY))
.collect(toList());
listener.onResponse(new QueryInitResponse(start, stop, requestId, columnInfo, new RowSetCursorResultPage(c)));
listener.onResponse(new QueryInitResponse(start, stop, requestId, columnInfo, new RowSetPayload(c)));
}, ex -> listener.onResponse(exceptionResponse(req, ex))));
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.jdbc;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Payload;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ProtoUtils;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.type.DataType;
import org.joda.time.ReadableInstant;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.JDBCType;
import java.util.List;
public class RowSetPayload implements Payload {
private final RowSet rowSet;
public RowSetPayload(RowSet rowSet) {
this.rowSet = rowSet;
}
@Override
public void read(DataInput in) throws IOException {
throw new UnsupportedOperationException("This class can only be serialized");
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(rowSet.size());
List<DataType> types = rowSet.schema().types();
// unroll forEach manually to avoid a Consumer + try/catch for each value...
for (boolean hasRows = rowSet.hasCurrentRow(); hasRows; hasRows = rowSet.advanceRow()) {
for (int i = 0; i < rowSet.rowSize(); i++) {
Object value = rowSet.column(i);
// unpack Joda classes on the server-side to not 'pollute' the common project and thus the client
if (types.get(i).sqlType() == JDBCType.TIMESTAMP && value instanceof ReadableInstant) {
// NOCOMMIT feels like a hack that'd be better cleaned up another way.
value = ((ReadableInstant) value).getMillis();
}
ProtoUtils.writeValue(out, value, types.get(i).sqlType());
}
}
}
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.jdbc;
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.jdbc;
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.client.node.NodeClient;
@ -14,8 +14,8 @@ import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.plugin.AbstractSqlServer;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractProto;
import org.elasticsearch.xpack.sql.server.AbstractSqlServer;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.io.DataInputStream;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.jdbc;
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.jdbc;
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.jdbc;
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Response;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.jdbc;
package org.elasticsearch.xpack.sql.plugin.jdbc.action;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
@ -18,6 +18,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.jdbc.JdbcServer;
import static org.elasticsearch.xpack.sql.util.ActionUtils.chain;

View File

@ -70,7 +70,7 @@ public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlRe
}
// NOCOMMIT move session information somewhere - like into scroll or something. We should be able to reuse something.
// generate the plan and once its done, generate the session id, store it and send back the response
// generate the plan and once it's done, generate the session id, store it and send back the response
planExecutor.sql(query, timeZone, chain(listener, c -> {
String id = generateId();
SESSIONS.put(id, c);

View File

@ -1,46 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.server.jdbc;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ResultPage;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.joda.time.ReadableInstant;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.JDBCType;
/**
* Adapts {@link RowSet} into a {@link ResultPage} so it can be serialized.
* Note that we are careful not to read the {@linkplain RowSet} more then
* once.
*/
public class RowSetCursorResultPage extends ResultPage {
private final RowSet rowSet;
public RowSetCursorResultPage(RowSet rowSet) {
this.rowSet = rowSet;
}
public void write(DataOutput out) throws IOException {
int rows = rowSet.size();
out.writeInt(rows);
if (rows == 0) {
return;
}
do {
for (int column = 0; column < rowSet.rowSize(); column++) {
JDBCType columnType = rowSet.schema().types().get(column).sqlType();
Object value = rowSet.column(column);
if (columnType == JDBCType.TIMESTAMP && value instanceof ReadableInstant) {
// TODO it feels like there should be a better way to do this
value = ((ReadableInstant) value).getMillis();
}
writeValue(out, value, columnType);
}
} while (rowSet.advanceRow());
}
}