SQL: Remove usage of Settings inside SqlSettings (elastic/x-pack-elasticsearch#2757)

* Remove usage of Settings inside SqlSettings

Also hook client timeouts to the backend
Set UTC as default timezone when using CSV
As the JVM timezone changes, make sure to pin it to UTC since this is what the results are computed against

Original commit: elastic/x-pack-elasticsearch@3e7aad8c1f
This commit is contained in:
Costin Leau 2017-10-27 18:55:59 +03:00 committed by GitHub
parent 6b1d0d1c8e
commit af591b9edd
24 changed files with 250 additions and 148 deletions

View File

@ -10,6 +10,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.xpack.sql.jdbc.jdbc.JdbcConnection;
import org.elasticsearch.xpack.sql.util.CollectionUtils;
import org.relique.io.TableReader;
import org.relique.jdbc.csv.CsvConnection;
@ -27,6 +28,7 @@ import java.sql.Statement;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.TimeZone;
import static org.hamcrest.Matchers.arrayWithSize;
@ -80,6 +82,10 @@ public abstract class CsvSpecTestCase extends SpecBaseIntegrationTestCase {
};
try (Connection csv = new CsvConnection(tableReader, csvProperties, "") {};
Connection es = esJdbc()) {
// make sure ES uses UTC (otherwise JDBC driver picks up the JVM timezone per spec/convention)
((JdbcConnection) es).setTimeZone(TimeZone.getTimeZone("UTC"));
// pass the testName as table for debugging purposes (in case the underlying reader is missing)
ResultSet expected = csv.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY)
.executeQuery("SELECT * FROM " + csvTableName);

View File

@ -18,13 +18,16 @@ import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.time.Instant;
import java.util.TimeZone;
public class CliHttpClient implements AutoCloseable {
private final HttpClient http;
private final CliConfiguration cfg;
public CliHttpClient(CliConfiguration cfg) {
http = new HttpClient(cfg);
this.cfg = cfg;
this.http = new HttpClient(cfg);
}
public InfoResponse serverInfo() {
@ -35,15 +38,19 @@ public class CliHttpClient implements AutoCloseable {
public Response queryInit(String query, int fetchSize) {
// TODO allow customizing the time zone - this is what session set/reset/get should be about
QueryInitRequest request = new QueryInitRequest(query, fetchSize, TimeZone.getTimeZone("UTC"), new TimeoutInfo(0, 0, 0));
QueryInitRequest request = new QueryInitRequest(query, fetchSize, TimeZone.getTimeZone("UTC"), timeout());
return sendRequest(request);
}
public Response nextPage(byte[] cursor) {
QueryPageRequest request = new QueryPageRequest(cursor, new TimeoutInfo(0, 0, 0));
QueryPageRequest request = new QueryPageRequest(cursor, timeout());
return sendRequest(request);
}
private TimeoutInfo timeout() {
long clientTime = Instant.now().toEpochMilli();
return new TimeoutInfo(clientTime, cfg.queryTimeout(), cfg.pageTimeout());
}
private Response sendRequest(Request request) {
Bytes ba = http.post(out -> Proto.INSTANCE.writeRequest(request, out));
try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(ba.bytes(), 0, ba.size()))) {

View File

@ -56,7 +56,7 @@ public class JdbcConfiguration extends ConnectionConfiguration {
private boolean debug = false;
private String debugOut = DEBUG_OUTPUT_DEFAULT;
private final TimeZone timeZone;
private TimeZone timeZone;
public JdbcConfiguration(String u, Properties props) throws JdbcSQLException {
super(props);
@ -183,15 +183,6 @@ public class JdbcConfiguration extends ConnectionConfiguration {
}
}
public String userName() {
return settings().getProperty(AUTH_USER);
}
public String password() {
// NOCOMMIT make sure we're doing right by the password. Compare with other jdbc drivers and our security code.
return settings().getProperty(AUTH_PASS);
}
private int port() {
return hostAndPort.port > 0 ? hostAndPort.port : 9200;
}
@ -208,6 +199,10 @@ public class JdbcConfiguration extends ConnectionConfiguration {
return timeZone;
}
public void timeZone(TimeZone tz) {
timeZone = tz;
}
public static boolean canAccept(String url) {
return (StringUtils.hasText(url) && url.trim().startsWith(JdbcConfiguration.URL_PREFIX));
}

View File

@ -29,6 +29,7 @@ import java.sql.Statement;
import java.sql.Struct;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@ -53,7 +54,7 @@ public class JdbcConnection implements Connection, JdbcWrapper {
client = new JdbcHttpClient(connectionInfo);
url = connectionInfo.asUrl().toExternalForm();
userName = connectionInfo.userName();
userName = connectionInfo.authUser();
}
private void checkOpen() throws SQLException {
@ -447,4 +448,9 @@ public class JdbcConnection implements Connection, JdbcWrapper {
public int esInfoMinorVersion() throws SQLException {
return client.serverInfo().minorVersion;
}
public void setTimeZone(TimeZone tz) {
cfg.timeZone(tz);
}
}

View File

@ -39,13 +39,13 @@ public class ConnectionConfiguration {
public static final String NETWORK_TIMEOUT = "network.timeout";
private static final String NETWORK_TIMEOUT_DEFAULT = String.valueOf(TimeUnit.MINUTES.toMillis(1));
// 1m
// 90s
public static final String QUERY_TIMEOUT = "query.timeout";
private static final String QUERY_TIMEOUT_DEFAULT = String.valueOf(TimeUnit.MINUTES.toMillis(1));
private static final String QUERY_TIMEOUT_DEFAULT = String.valueOf(TimeUnit.SECONDS.toMillis(90));
// 5m
// 45s
public static final String PAGE_TIMEOUT = "page.timeout";
private static final String PAGE_TIMEOUT_DEFAULT = String.valueOf(TimeUnit.MINUTES.toMillis(5));
private static final String PAGE_TIMEOUT_DEFAULT = String.valueOf(TimeUnit.SECONDS.toMillis(45));
public static final String PAGE_SIZE = "page.size";
private static final String PAGE_SIZE_DEFAULT = "1000";

View File

@ -22,7 +22,7 @@ import org.elasticsearch.xpack.sql.planner.PlanningException;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.session.SqlSettings;
import org.elasticsearch.xpack.sql.session.Configuration;
import java.util.function.Function;
import java.util.function.Supplier;
@ -50,13 +50,13 @@ public class PlanExecutor {
this.planner = new Planner();
}
public SqlSession newSession(SqlSettings settings) {
public SqlSession newSession(Configuration cfg) {
Catalog catalog = catalogSupplier.apply(stateSupplier.get());
return new SqlSession(settings, client, catalog, functionRegistry, parser, optimizer, planner);
return new SqlSession(cfg, client, catalog, functionRegistry, parser, optimizer, planner);
}
public SearchSourceBuilder searchSource(String sql, SqlSettings settings) {
public SearchSourceBuilder searchSource(String sql, Configuration settings) {
PhysicalPlan executable = newSession(settings).executable(sql);
if (executable instanceof EsQueryExec) {
EsQueryExec e = (EsQueryExec) executable;
@ -68,10 +68,10 @@ public class PlanExecutor {
}
public void sql(String sql, ActionListener<RowSet> listener) {
sql(SqlSettings.EMPTY, sql, listener);
sql(Configuration.DEFAULT, sql, listener);
}
public void sql(SqlSettings sqlSettings, String sql, ActionListener<RowSet> listener) {
public void sql(Configuration sqlSettings, String sql, ActionListener<RowSet> listener) {
SqlSession session = newSession(sqlSettings);
try {
PhysicalPlan executable = session.executable(sql);

View File

@ -44,9 +44,9 @@ import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.sql.querydsl.container.ScriptFieldRef;
import org.elasticsearch.xpack.sql.querydsl.container.SearchHitFieldRef;
import org.elasticsearch.xpack.sql.querydsl.container.TotalCountRef;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSettings;
import org.elasticsearch.xpack.sql.type.Schema;
import org.elasticsearch.xpack.sql.util.StringUtils;
@ -62,9 +62,8 @@ public class Scroller {
private final int size;
private final Client client;
public Scroller(Client client, SqlSettings settings) {
// NOCOMMIT the scroll time should be available in the request somehow. Rest is going to fail badly unless they set it.
this(client, TimeValue.timeValueSeconds(90), TimeValue.timeValueSeconds(45), settings.pageSize());
public Scroller(Client client, Configuration cfg) {
this(client, cfg.requestTimeout(), cfg.pageTimeout(), cfg.pageSize());
}
public Scroller(Client client, TimeValue keepAlive, TimeValue timeout, int size) {

View File

@ -10,7 +10,7 @@ import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.expression.function.aware.DistinctAware;
import org.elasticsearch.xpack.sql.expression.function.aware.TimeZoneAware;
import org.elasticsearch.xpack.sql.parser.ParsingException;
import org.elasticsearch.xpack.sql.session.SqlSettings;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.tree.Node;
import org.elasticsearch.xpack.sql.tree.NodeUtils;
import org.elasticsearch.xpack.sql.tree.NodeUtils.NodeInfo;
@ -51,12 +51,12 @@ abstract class AbstractFunctionRegistry implements FunctionRegistry {
@Override
public Function resolveFunction(UnresolvedFunction ur, SqlSettings settings) {
public Function resolveFunction(UnresolvedFunction ur, Configuration cfg) {
FunctionDefinition def = defs.get(normalize(ur.name()));
if (def == null) {
throw new SqlIllegalArgumentException("Cannot find function %s; this should have been caught during analysis", ur.name());
}
return createInstance(def.clazz(), ur, settings);
return createInstance(def.clazz(), ur, cfg);
}
@Override
@ -107,7 +107,7 @@ abstract class AbstractFunctionRegistry implements FunctionRegistry {
// If the function has certain 'aware'-ness (based on the interface implemented), the appropriate types are added to the signature
@SuppressWarnings("rawtypes")
private static Function createInstance(Class<? extends Function> clazz, UnresolvedFunction ur, SqlSettings settings) {
private static Function createInstance(Class<? extends Function> clazz, UnresolvedFunction ur, Configuration cfg) {
NodeInfo info = NodeUtils.info((Class<? extends Node>) clazz);
Class<?>[] pTypes = info.ctr.getParameterTypes();
@ -171,7 +171,7 @@ abstract class AbstractFunctionRegistry implements FunctionRegistry {
args.add(ur.distinct());
}
if (timezoneAware) {
args.add(settings.timeZone());
args.add(cfg.timeZone());
}
return (Function) info.ctr.newInstance(args.toArray());

View File

@ -5,13 +5,13 @@
*/
package org.elasticsearch.xpack.sql.expression.function;
import org.elasticsearch.xpack.sql.session.SqlSettings;
import org.elasticsearch.xpack.sql.session.Configuration;
import java.util.Collection;
public interface FunctionRegistry {
Function resolveFunction(UnresolvedFunction ur, SqlSettings settings);
Function resolveFunction(UnresolvedFunction ur, Configuration settings);
String concreteFunctionName(String alias);

View File

@ -15,13 +15,14 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequest;
@ -49,7 +50,7 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction {
@Override
public String getName() {
return "xpack_sql_cli_action";
}
}
@Override
protected RestChannelConsumer innerPrepareRequest(Request request, Client client) throws IOException {
@ -90,7 +91,10 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction {
private Consumer<RestChannel> queryInit(Client client, QueryInitRequest request) {
// TODO time zone support for CLI
SqlRequest sqlRequest = new SqlRequest(request.query, DateTimeZone.forTimeZone(request.timeZone), request.fetchSize, Cursor.EMPTY);
SqlRequest sqlRequest = new SqlRequest(request.query, DateTimeZone.forTimeZone(request.timeZone), request.fetchSize,
TimeValue.timeValueMillis(request.timeout.requestTimeout),
TimeValue.timeValueMillis(request.timeout.pageTimeout),
Cursor.EMPTY);
long start = System.nanoTime();
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> {
CliFormatter formatter = new CliFormatter(response);
@ -108,7 +112,11 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction {
} catch (IOException e) {
throw new IllegalArgumentException("error reading the cursor");
}
SqlRequest sqlRequest = new SqlRequest("", SqlRequest.DEFAULT_TIME_ZONE, -1, cursor);
SqlRequest sqlRequest = new SqlRequest("", SqlRequest.DEFAULT_TIME_ZONE, 0,
TimeValue.timeValueMillis(request.timeout.requestTimeout),
TimeValue.timeValueMillis(request.timeout.pageTimeout),
cursor);
long start = System.nanoTime();
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> {
String data = formatter.formatWithoutHeader(response);
@ -128,4 +136,4 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction {
throw new RuntimeException("unexpected trouble building the cursor", e);
}
}
}
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
@ -149,8 +150,10 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
}
private Consumer<RestChannel> queryInit(Client client, QueryInitRequest request) {
SqlRequest sqlRequest = new SqlRequest(request.query, SqlRequest.DEFAULT_TIME_ZONE, request.fetchSize, Cursor.EMPTY);
sqlRequest.timeZone(DateTimeZone.forTimeZone(request.timeZone));
SqlRequest sqlRequest = new SqlRequest(request.query, DateTimeZone.forTimeZone(request.timeZone), request.fetchSize,
TimeValue.timeValueMillis(request.timeout.requestTimeout),
TimeValue.timeValueMillis(request.timeout.pageTimeout),
Cursor.EMPTY);
long start = System.nanoTime();
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> {
List<JDBCType> types = new ArrayList<>(response.columns().size());
@ -173,7 +176,11 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
} catch (IOException e) {
throw new IllegalArgumentException("error reading the cursor");
}
SqlRequest sqlRequest = new SqlRequest(EMPTY, SqlRequest.DEFAULT_TIME_ZONE, 0, cursor);
// NB: the timezone and page size are locked already by the query so pass in defaults (as they are not read anyway)
SqlRequest sqlRequest = new SqlRequest(EMPTY, SqlRequest.DEFAULT_TIME_ZONE, 0,
TimeValue.timeValueMillis(request.timeout.requestTimeout),
TimeValue.timeValueMillis(request.timeout.pageTimeout),
cursor);
long start = System.nanoTime();
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(request, channel, response -> {
return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types),

View File

@ -12,8 +12,10 @@ import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryInitRequest;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import org.joda.time.DateTimeZone;
import java.io.IOException;
@ -24,18 +26,25 @@ public abstract class AbstractSqlRequest extends ActionRequest implements Compos
public static final DateTimeZone DEFAULT_TIME_ZONE = DateTimeZone.UTC;
public static final int DEFAULT_FETCH_SIZE = AbstractQueryInitRequest.DEFAULT_FETCH_SIZE;
public static final TimeValue DEFAULT_REQUEST_TIMEOUT = TimeValue.timeValueMillis(TimeoutInfo.DEFAULT_REQUEST_TIMEOUT);
public static final TimeValue DEFAULT_PAGE_TIMEOUT = TimeValue.timeValueMillis(TimeoutInfo.DEFAULT_PAGE_TIMEOUT);
private String query = "";
private DateTimeZone timeZone = DEFAULT_TIME_ZONE;
private int fetchSize = DEFAULT_FETCH_SIZE;
private TimeValue requestTimeout = DEFAULT_REQUEST_TIMEOUT;
private TimeValue pageTimeout = DEFAULT_PAGE_TIMEOUT;
public AbstractSqlRequest() {
super();
}
public AbstractSqlRequest(String query, DateTimeZone timeZone, int fetchSize) {
public AbstractSqlRequest(String query, DateTimeZone timeZone, int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout) {
this.query = query;
this.timeZone = timeZone;
this.fetchSize = fetchSize;
this.requestTimeout = requestTimeout;
this.pageTimeout = pageTimeout;
}
public static <R extends AbstractSqlRequest> ObjectParser<R, Void> objectParser(Supplier<R> supplier) {
@ -44,7 +53,12 @@ public abstract class AbstractSqlRequest extends ActionRequest implements Compos
parser.declareString(AbstractSqlRequest::query, new ParseField("query"));
parser.declareString((request, zoneId) -> request.timeZone(DateTimeZone.forID(zoneId)), new ParseField("time_zone"));
parser.declareInt(AbstractSqlRequest::fetchSize, new ParseField("fetch_size"));
parser.declareString(
(request, timeout) -> request.requestTimeout(TimeValue.parseTimeValue(timeout, DEFAULT_REQUEST_TIMEOUT, "request_timeout")),
new ParseField("request_timeout"));
parser.declareString(
(request, timeout) -> request.pageTimeout(TimeValue.parseTimeValue(timeout, DEFAULT_PAGE_TIMEOUT, "page_timeout")),
new ParseField("page_timeout"));
return parser;
}
@ -84,18 +98,39 @@ public abstract class AbstractSqlRequest extends ActionRequest implements Compos
*/
public AbstractSqlRequest fetchSize(int fetchSize) {
if (fetchSize <= 0) {
throw new IllegalArgumentException("fetch_size must be more than 0");
throw new IllegalArgumentException("fetch_size must be more than 0.");
}
this.fetchSize = fetchSize;
return this;
}
public TimeValue requestTimeout() {
return requestTimeout;
}
public AbstractSqlRequest requestTimeout(TimeValue requestTimeout) {
this.requestTimeout = requestTimeout;
return this;
}
public TimeValue pageTimeout() {
return pageTimeout;
}
public AbstractSqlRequest pageTimeout(TimeValue pageTimeout) {
this.pageTimeout = pageTimeout;
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
query = in.readString();
timeZone = DateTimeZone.forID(in.readString());
fetchSize = in.readVInt();
requestTimeout = new TimeValue(in);
pageTimeout = new TimeValue(in);
}
@Override
@ -104,11 +139,13 @@ public abstract class AbstractSqlRequest extends ActionRequest implements Compos
out.writeString(query);
out.writeString(timeZone.getID());
out.writeVInt(fetchSize);
requestTimeout.writeTo(out);
pageTimeout.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(query, timeZone, fetchSize);
return Objects.hash(query, timeZone, fetchSize, requestTimeout, pageTimeout);
}
@Override
@ -124,6 +161,8 @@ public abstract class AbstractSqlRequest extends ActionRequest implements Compos
AbstractSqlRequest other = (AbstractSqlRequest) obj;
return Objects.equals(query, other.query)
&& Objects.equals(timeZone, other.timeZone)
&& fetchSize == other.fetchSize;
&& fetchSize == other.fetchSize
&& Objects.equals(requestTimeout, other.requestTimeout)
&& Objects.equals(pageTimeout, other.pageTimeout);
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.joda.time.DateTimeZone;
@ -33,8 +34,8 @@ public class SqlRequest extends AbstractSqlRequest {
public SqlRequest() {}
public SqlRequest(String query, DateTimeZone timeZone, int fetchSize, Cursor cursor) {
super(query, timeZone, fetchSize);
public SqlRequest(String query, DateTimeZone timeZone, int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout, Cursor cursor) {
super(query, timeZone, fetchSize, requestTimeout, pageTimeout);
this.cursor = cursor;
}

View File

@ -7,20 +7,24 @@ package org.elasticsearch.xpack.sql.plugin.sql.action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.joda.time.DateTimeZone;
import static org.elasticsearch.xpack.sql.plugin.sql.action.AbstractSqlRequest.DEFAULT_FETCH_SIZE;
import static org.elasticsearch.xpack.sql.plugin.sql.action.AbstractSqlRequest.DEFAULT_PAGE_TIMEOUT;
import static org.elasticsearch.xpack.sql.plugin.sql.action.AbstractSqlRequest.DEFAULT_REQUEST_TIMEOUT;
import static org.elasticsearch.xpack.sql.plugin.sql.action.AbstractSqlRequest.DEFAULT_TIME_ZONE;
public class SqlRequestBuilder extends ActionRequestBuilder<SqlRequest, SqlResponse, SqlRequestBuilder> {
public SqlRequestBuilder(ElasticsearchClient client, SqlAction action) {
this(client, action, "", DEFAULT_TIME_ZONE, DEFAULT_FETCH_SIZE, Cursor.EMPTY);
this(client, action, "", DEFAULT_TIME_ZONE, DEFAULT_FETCH_SIZE, DEFAULT_REQUEST_TIMEOUT, DEFAULT_PAGE_TIMEOUT, Cursor.EMPTY);
}
public SqlRequestBuilder(ElasticsearchClient client, SqlAction action, String query, DateTimeZone timeZone, int fetchSize, Cursor nextPageInfo) {
super(client, action, new SqlRequest(query, timeZone, fetchSize, nextPageInfo));
public SqlRequestBuilder(ElasticsearchClient client, SqlAction action, String query, DateTimeZone timeZone, int fetchSize,
TimeValue requestTimeout, TimeValue pageTimeout, Cursor nextPageInfo) {
super(client, action, new SqlRequest(query, timeZone, fetchSize, requestTimeout, pageTimeout, nextPageInfo));
}
public SqlRequestBuilder query(String query) {
@ -38,4 +42,13 @@ public class SqlRequestBuilder extends ActionRequestBuilder<SqlRequest, SqlRespo
return this;
}
public SqlRequestBuilder requestTimeout(TimeValue timeout) {
request.requestTimeout(timeout);
return this;
}
public SqlRequestBuilder pageTimeout(TimeValue timeout) {
request.pageTimeout(timeout);
return this;
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.sql.plugin.sql.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.joda.time.DateTimeZone;
@ -18,8 +19,8 @@ public class SqlTranslateRequest extends AbstractSqlRequest {
public SqlTranslateRequest() {}
public SqlTranslateRequest(String query, DateTimeZone timeZone, int fetchSize) {
super(query, timeZone, fetchSize);
public SqlTranslateRequest(String query, DateTimeZone timeZone, int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout) {
super(query, timeZone, fetchSize, requestTimeout, pageTimeout);
}
@Override

View File

@ -7,20 +7,24 @@ package org.elasticsearch.xpack.sql.plugin.sql.action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;
import org.joda.time.DateTimeZone;
import static org.elasticsearch.xpack.sql.plugin.sql.action.AbstractSqlRequest.DEFAULT_FETCH_SIZE;
import static org.elasticsearch.xpack.sql.plugin.sql.action.AbstractSqlRequest.DEFAULT_PAGE_TIMEOUT;
import static org.elasticsearch.xpack.sql.plugin.sql.action.AbstractSqlRequest.DEFAULT_REQUEST_TIMEOUT;
import static org.elasticsearch.xpack.sql.plugin.sql.action.AbstractSqlRequest.DEFAULT_TIME_ZONE;
public class SqlTranslateRequestBuilder
extends ActionRequestBuilder<SqlTranslateRequest, SqlTranslateResponse, SqlTranslateRequestBuilder> {
public SqlTranslateRequestBuilder(ElasticsearchClient client, SqlTranslateAction action) {
this(client, action, null, DEFAULT_TIME_ZONE, DEFAULT_FETCH_SIZE);
this(client, action, null, DEFAULT_TIME_ZONE, DEFAULT_FETCH_SIZE, DEFAULT_REQUEST_TIMEOUT, DEFAULT_PAGE_TIMEOUT);
}
public SqlTranslateRequestBuilder(ElasticsearchClient client, SqlTranslateAction action, String query, DateTimeZone timeZone, int fetchSize) {
super(client, action, new SqlTranslateRequest(query, timeZone, fetchSize));
public SqlTranslateRequestBuilder(ElasticsearchClient client, SqlTranslateAction action, String query, DateTimeZone timeZone,
int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout) {
super(client, action, new SqlTranslateRequest(query, timeZone, fetchSize, requestTimeout, pageTimeout));
}
public SqlTranslateRequestBuilder query(String query) {

View File

@ -16,9 +16,9 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.SqlLicenseChecker;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse.ColumnInfo;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SqlSettings;
import org.elasticsearch.xpack.sql.type.Schema;
import java.util.ArrayList;
@ -53,10 +53,9 @@ public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlRe
*/
public static void operation(PlanExecutor planExecutor, SqlRequest request, ActionListener<SqlResponse> listener) {
if (request.cursor() == Cursor.EMPTY) {
SqlSettings sqlSettings = new SqlSettings(Settings.builder()
.put(SqlSettings.PAGE_SIZE, request.fetchSize())
.put(SqlSettings.TIMEZONE_ID, request.timeZone().getID()).build());
planExecutor.sql(sqlSettings, request.query(),
Configuration cfg = new Configuration(request.timeZone(), request.fetchSize(),
request.requestTimeout(), request.pageTimeout());
planExecutor.sql(cfg, request.query(),
ActionListener.wrap(cursor -> listener.onResponse(createResponse(true, cursor)), listener::onFailure));
} else {
planExecutor.nextPage(request.cursor(),

View File

@ -15,7 +15,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.SqlLicenseChecker;
import org.elasticsearch.xpack.sql.session.SqlSettings;
import org.elasticsearch.xpack.sql.session.Configuration;
public class TransportSqlTranslateAction extends HandledTransportAction<SqlTranslateRequest, SqlTranslateResponse> {
@ -39,10 +39,8 @@ public class TransportSqlTranslateAction extends HandledTransportAction<SqlTrans
sqlLicenseChecker.checkIfSqlAllowed();
String query = request.query();
SqlSettings sqlSettings = new SqlSettings(Settings.builder()
.put(SqlSettings.PAGE_SIZE, request.fetchSize())
.put(SqlSettings.TIMEZONE_ID, request.timeZone().getID()).build());
Configuration cfg = new Configuration(request.timeZone(), request.fetchSize(), request.requestTimeout(), request.pageTimeout());
listener.onResponse(new SqlTranslateResponse(planExecutor.searchSource(query, sqlSettings)));
listener.onResponse(new SqlTranslateResponse(planExecutor.searchSource(query, cfg)));
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.sql.plugin.sql.action.AbstractSqlRequest;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryInitRequest;
import org.joda.time.DateTimeZone;
// Typed object holding properties for a given
public class Configuration {
public static final Configuration DEFAULT = new Configuration(DateTimeZone.UTC,
AbstractQueryInitRequest.DEFAULT_FETCH_SIZE,
AbstractSqlRequest.DEFAULT_REQUEST_TIMEOUT,
AbstractSqlRequest.DEFAULT_PAGE_TIMEOUT);
private DateTimeZone timeZone;
private int pageSize;
private TimeValue requestTimeout;
private TimeValue pageTimeout;
public Configuration(DateTimeZone tz, int pageSize, TimeValue requestTimeout, TimeValue pageTimeout) {
this.timeZone = tz;
this.pageSize = pageSize;
this.requestTimeout = requestTimeout;
this.pageTimeout = pageTimeout;
}
public DateTimeZone timeZone() {
return timeZone;
}
public int pageSize() {
return pageSize;
}
public TimeValue requestTimeout() {
return requestTimeout;
}
public TimeValue pageTimeout() {
return pageTimeout;
}
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
@ -22,7 +21,6 @@ import org.elasticsearch.xpack.sql.planner.Planner;
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction;
import java.util.List;
import java.util.function.Function;
public class SqlSession {
@ -35,12 +33,12 @@ public class SqlSession {
private final Planner planner;
private final Analyzer analyzer;
private final SqlSettings defaults; // NOCOMMIT this doesn't look used - it is for RESET SESSION
private SqlSettings settings;
private final Configuration defaults; // NOCOMMIT this doesn't look used - it is for RESET SESSION
private Configuration settings;
// thread-local used for sharing settings across the plan compilation
// TODO investigate removing
public static final ThreadLocal<SqlSettings> CURRENT_SETTINGS = new ThreadLocal<SqlSettings>() {
public static final ThreadLocal<Configuration> CURRENT_SETTINGS = new ThreadLocal<Configuration>() {
@Override
public String toString() {
return "SQL Session";
@ -52,7 +50,7 @@ public class SqlSession {
other.parser, other.optimizer(), other.planner());
}
public SqlSession(SqlSettings defaults, Client client,
public SqlSession(Configuration defaults, Client client,
Catalog catalog, FunctionRegistry functionRegistry,
SqlParser parser,
Optimizer optimizer,
@ -139,17 +137,11 @@ public class SqlSession {
executable(sql).execute(this, listener);
}
public SqlSettings defaults() {
public Configuration defaults() {
return defaults;
}
public SqlSettings settings() {
return settings;
}
// session SET
public SqlSettings updateSettings(Function<Settings, Settings> transformer) {
settings = new SqlSettings(transformer.apply(settings.cfg()));
public Configuration settings() {
return settings;
}

View File

@ -1,52 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.common.settings.Settings;
import org.joda.time.DateTimeZone;
import java.util.TimeZone;
// Typed object holding properties for a given
public class SqlSettings {
public static final SqlSettings EMPTY = new SqlSettings(Settings.EMPTY);
public static final String TIMEZONE_ID = "sql.timeZoneId";
public static final String TIMEZONE_ID_DEFAULT = null;
public static final String PAGE_SIZE = "sql.fetch.size";
public static final int PAGE_SIZE_DEFAULT = 100;
private final Settings cfg;
public SqlSettings(Settings cfg) {
// NOCOMMIT investigate taking the arguments we need instead of Settings
this.cfg = cfg;
}
public Settings cfg() {
return cfg;
}
@Override
public String toString() {
return cfg.toDelimitedString(',');
}
public String timeZoneId() {
return cfg.get(TIMEZONE_ID, TIMEZONE_ID_DEFAULT);
}
public DateTimeZone timeZone() {
// use this instead of DateTimeZone#forID because DTZ doesn't support all of j.u.TZ IDs (such as IST)
return DateTimeZone.forTimeZone(TimeZone.getTimeZone(TIMEZONE_ID));
}
public int pageSize() {
return cfg.getAsInt(PAGE_SIZE, PAGE_SIZE_DEFAULT);
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.sql.plugin.sql.action;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils.MutateFunction;
@ -21,7 +22,12 @@ public class SqlRequestTests extends AbstractStreamableTestCase<SqlRequest> {
@Override
protected SqlRequest createTestInstance() {
return new SqlRequest(randomAlphaOfLength(10), randomDateTimeZone(), between(1, Integer.MAX_VALUE), randomCursor());
return new SqlRequest(randomAlphaOfLength(10), randomDateTimeZone(), between(1, Integer.MAX_VALUE),
randomTV(), randomTV(), randomCursor());
}
private TimeValue randomTV() {
return TimeValue.parseTimeValue(randomTimeValue(), null, "test");
}
@Override
@ -40,6 +46,10 @@ public class SqlRequestTests extends AbstractStreamableTestCase<SqlRequest> {
request -> (SqlRequest) getCopyFunction().copy(request)
.timeZone(randomValueOtherThan(request.timeZone(), ESTestCase::randomDateTimeZone)),
request -> (SqlRequest) getCopyFunction().copy(request)
.fetchSize(randomValueOtherThan(request.fetchSize(), () -> between(1, Integer.MAX_VALUE))));
.fetchSize(randomValueOtherThan(request.fetchSize(), () -> between(1, Integer.MAX_VALUE))),
request -> (SqlRequest) getCopyFunction().copy(request)
.requestTimeout(randomValueOtherThan(request.requestTimeout(), () -> randomTV())),
request -> (SqlRequest) getCopyFunction().copy(request)
.pageTimeout(randomValueOtherThan(request.pageTimeout(), () -> randomTV())));
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.sql.plugin.sql.action;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils.MutateFunction;
@ -13,7 +14,12 @@ public class SqlTranslateRequestTests extends AbstractStreamableTestCase<SqlTran
@Override
protected SqlTranslateRequest createTestInstance() {
return new SqlTranslateRequest(randomAlphaOfLength(10), randomDateTimeZone(), between(1, Integer.MAX_VALUE));
return new SqlTranslateRequest(randomAlphaOfLength(10), randomDateTimeZone(), between(1, Integer.MAX_VALUE), randomTV(),
randomTV());
}
private TimeValue randomTV() {
return TimeValue.parseTimeValue(randomTimeValue(), null, "test");
}
@Override
@ -30,6 +36,10 @@ public class SqlTranslateRequestTests extends AbstractStreamableTestCase<SqlTran
request -> (SqlTranslateRequest) getCopyFunction().copy(request)
.timeZone(randomValueOtherThan(request.timeZone(), ESTestCase::randomDateTimeZone)),
request -> (SqlTranslateRequest) getCopyFunction().copy(request)
.fetchSize(randomValueOtherThan(request.fetchSize(), () -> between(1, Integer.MAX_VALUE))));
.fetchSize(randomValueOtherThan(request.fetchSize(), () -> between(1, Integer.MAX_VALUE))),
request -> (SqlTranslateRequest) getCopyFunction().copy(request)
.requestTimeout(randomValueOtherThan(request.requestTimeout(), () -> randomTV())),
request -> (SqlTranslateRequest) getCopyFunction().copy(request)
.pageTimeout(randomValueOtherThan(request.pageTimeout(), () -> randomTV())));
}
}

View File

@ -9,31 +9,43 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* Common class handling timeouts. Due to the nature of JDBC, all timeout values are expressed as millis.
* Contains
*/
public class TimeoutInfo {
public final long clientTime, timeout, requestTimeout;
public static final long DEFAULT_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(90);
public static final long DEFAULT_PAGE_TIMEOUT = TimeUnit.SECONDS.toMillis(45);
// client time - millis since epoch of when the client made the request
// request timeout - how long the client is willing to wait for the server to process its request
// page timeout - how long retrieving the next page (of the query) should take (this is used to scroll across pages)
public final long clientTime, requestTimeout, pageTimeout;
public TimeoutInfo(long clientTime, long timeout, long requestTimeout) {
this.clientTime = clientTime;
this.timeout = timeout;
this.requestTimeout = requestTimeout;
this.requestTimeout = timeout;
this.pageTimeout = requestTimeout;
}
TimeoutInfo(DataInput in) throws IOException {
clientTime = in.readLong();
timeout = in.readLong();
requestTimeout = in.readLong();
pageTimeout = in.readLong();
}
void writeTo(DataOutput out) throws IOException {
out.writeLong(clientTime);
out.writeLong(timeout);
out.writeLong(requestTimeout);
out.writeLong(pageTimeout);
}
@Override
public String toString() {
return "client=" + clientTime + ",timeout=" + timeout + ",request=" + requestTimeout;
return "client=" + clientTime + ",request=" + requestTimeout + ",page=" + pageTimeout;
}
@Override
@ -43,12 +55,12 @@ public class TimeoutInfo {
}
TimeoutInfo other = (TimeoutInfo) obj;
return clientTime == other.clientTime
&& timeout == other.timeout
&& requestTimeout == other.requestTimeout;
&& requestTimeout == other.requestTimeout
&& pageTimeout == other.pageTimeout;
}
@Override
public int hashCode() {
return Objects.hash(clientTime, timeout, requestTimeout);
return Objects.hash(clientTime, requestTimeout, pageTimeout);
}
}