SQL: XPack FeatureSet functionality (#35725)

* Introduced "client.id" parameter for REST requests
* Bug that made the Verifier run twice, fixed in the Analyzer
* Single node IT and unit testing
This commit is contained in:
Andrei Stefan 2018-11-26 07:11:43 +02:00 committed by GitHub
parent a0f8a22d53
commit 00e6fec718
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 1766 additions and 135 deletions

View File

@ -102,6 +102,11 @@ Example response:
"available" : true,
"enabled" : false
},
"sql" : {
"description" : "SQL access to Elasticsearch",
"available" : true,
"enabled" : true
},
"watcher" : {
"description" : "Alerting, Notification and Automation for the Elastic Stack",
"available" : true,

View File

@ -159,6 +159,7 @@ import org.elasticsearch.xpack.core.security.authc.support.mapper.expressiondsl.
import org.elasticsearch.xpack.core.security.authz.privilege.ConditionalClusterPrivilege;
import org.elasticsearch.xpack.core.security.authz.privilege.ConditionalClusterPrivileges;
import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport;
import org.elasticsearch.xpack.core.sql.SqlFeatureSetUsage;
import org.elasticsearch.xpack.core.ssl.SSLService;
import org.elasticsearch.xpack.core.ssl.action.GetCertificateInfoAction;
import org.elasticsearch.xpack.core.upgrade.actions.IndexUpgradeAction;
@ -387,6 +388,8 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
new NamedWriteableRegistry.Entry(RoleMapperExpression.class, AnyExpression.NAME, AnyExpression::new),
new NamedWriteableRegistry.Entry(RoleMapperExpression.class, FieldExpression.NAME, FieldExpression::new),
new NamedWriteableRegistry.Entry(RoleMapperExpression.class, ExceptExpression.NAME, ExceptExpression::new),
// sql
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SQL, SqlFeatureSetUsage::new),
// watcher
new NamedWriteableRegistry.Entry(MetaData.Custom.class, WatcherMetaData.TYPE, WatcherMetaData::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, WatcherMetaData.TYPE, WatcherMetaData::readDiffFrom),

View File

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.sql;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackField;
import java.io.IOException;
import java.util.Map;
public class SqlFeatureSetUsage extends XPackFeatureSet.Usage {
private final Map<String, Object> stats;
public SqlFeatureSetUsage(StreamInput in) throws IOException {
super(in);
stats = in.readMap();
}
public SqlFeatureSetUsage(boolean available, boolean enabled, Map<String, Object> stats) {
super(XPackField.SQL, available, enabled);
this.stats = stats;
}
public Map<String, Object> stats() {
return stats;
}
@Override
protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
super.innerXContent(builder, params);
if (enabled) {
for (Map.Entry<String, Object> entry : stats.entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(stats);
}
}

View File

@ -19,6 +19,7 @@ import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.Protocol;
import org.elasticsearch.xpack.sql.proto.SqlQueryRequest;
import org.elasticsearch.xpack.sql.proto.SqlQueryResponse;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
import java.sql.SQLException;
@ -51,9 +52,10 @@ public class JdbcHttpClient {
public Cursor query(String sql, List<SqlTypedParamValue> params, RequestMeta meta) throws SQLException {
int fetch = meta.fetchSize() > 0 ? meta.fetchSize() : conCfg.pageSize();
SqlQueryRequest sqlRequest = new SqlQueryRequest(Mode.JDBC, sql, params, null,
Protocol.TIME_ZONE,
fetch, TimeValue.timeValueMillis(meta.timeoutInMs()), TimeValue.timeValueMillis(meta.queryTimeoutInMs()));
SqlQueryRequest sqlRequest = new SqlQueryRequest(sql, params, null, Protocol.TIME_ZONE,
fetch,
TimeValue.timeValueMillis(meta.timeoutInMs()), TimeValue.timeValueMillis(meta.queryTimeoutInMs()),
new RequestInfo(Mode.JDBC));
SqlQueryResponse response = httpClient.query(sqlRequest);
return new DefaultCursor(this, response.cursor(), toJdbcColumnInfo(response.columns()), response.rows(), meta);
}
@ -63,8 +65,8 @@ public class JdbcHttpClient {
* the scroll id to use to fetch the next page.
*/
public Tuple<String, List<List<Object>>> nextPage(String cursor, RequestMeta meta) throws SQLException {
SqlQueryRequest sqlRequest = new SqlQueryRequest(Mode.JDBC, cursor, TimeValue.timeValueMillis(meta.timeoutInMs()),
TimeValue.timeValueMillis(meta.queryTimeoutInMs()));
SqlQueryRequest sqlRequest = new SqlQueryRequest(cursor, TimeValue.timeValueMillis(meta.timeoutInMs()),
TimeValue.timeValueMillis(meta.queryTimeoutInMs()), new RequestInfo(Mode.JDBC));
SqlQueryResponse response = httpClient.query(sqlRequest);
return new Tuple<>(response.cursor(), response.rows());
}

View File

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.qa.single_node;
import org.elasticsearch.xpack.sql.qa.rest.RestSqlUsageTestCase;
public class RestSqlUsageIT extends RestSqlUsageTestCase {
}

View File

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.qa;
import java.util.Locale;
public enum FeatureMetric {
COMMAND,
GROUPBY,
HAVING,
JOIN,
LIMIT,
LOCAL,
ORDERBY,
SUBSELECT,
WHERE;
@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.qa;
import java.util.Locale;
public enum QueryMetric {
JDBC, ODBC, CLI, CANVAS, REST;
public static QueryMetric fromString(String metric) {
if (metric == null || metric.equalsIgnoreCase("plain")) {
return REST;
}
return QueryMetric.valueOf(metric.toUpperCase(Locale.ROOT));
}
@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
}
}

View File

@ -0,0 +1,336 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.qa.rest;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.qa.FeatureMetric;
import org.junit.Before;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
public abstract class RestSqlUsageTestCase extends ESRestTestCase {
private List<IndexDocument> testData = Arrays.asList(
new IndexDocument("used", "Don Quixote", 1072),
new IndexDocument("used", "Vacuum Diagrams", 335),
new IndexDocument("new", "Darwin's Radio", 270),
new IndexDocument("used", "The Martian", 387),
new IndexDocument("new", "Moving Mars", 495)
);
private enum ClientType {
CANVAS, CLI, JDBC, ODBC, REST;
@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
}
}
private Map<String,Integer> baseMetrics = new HashMap<String,Integer>();
private Integer baseClientTypeTotalQueries = 0;
private Integer baseClientTypeFailedQueries = 0;
private Integer baseAllTotalQueries = 0;
private Integer baseAllFailedQueries = 0;
private Integer baseTranslateRequests = 0;
private String clientType;
private boolean ignoreClientType;
/**
* This method gets the metrics' values before the test runs, in case these values
* were changed by other tests running in the same REST test cluster. The test itself
* will count the new metrics' values starting from the base values initialized here.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Before
private void getBaseMetrics() throws UnsupportedOperationException, IOException {
Map<String, Object> baseStats = getStats();
List<Map<String, Map<String, Map>>> nodesListStats = (List) baseStats.get("stats");
// used for "client.id" request parameter value, but also for getting the stats from ES
clientType = randomFrom(ClientType.values()).toString();
ignoreClientType = randomBoolean();
// "client.id" parameter will not be sent in the requests
// and "clientType" will only be used for getting the stats back from ES
if (ignoreClientType) {
clientType = ClientType.REST.toString();
}
for (Map perNodeStats : nodesListStats) {
Map featuresMetrics = (Map) ((Map) perNodeStats.get("stats")).get("features");
Map queriesMetrics = (Map) ((Map) perNodeStats.get("stats")).get("queries");
for (FeatureMetric metric : FeatureMetric.values()) {
baseMetrics.put(metric.toString(), (Integer) featuresMetrics.get(metric.toString()));
}
// initialize the "base" metric values with whatever values are already recorder on ES
baseClientTypeTotalQueries = ((Map<String,Integer>) queriesMetrics.get(clientType)).get("total");
baseClientTypeFailedQueries = ((Map<String,Integer>) queriesMetrics.get(clientType)).get("failed");
baseAllTotalQueries = ((Map<String,Integer>) queriesMetrics.get("_all")).get("total");
baseAllFailedQueries = ((Map<String,Integer>) queriesMetrics.get("_all")).get("failed");
baseTranslateRequests = ((Map<String,Integer>) queriesMetrics.get("translate")).get("count");
}
}
public void testSqlRestUsage() throws IOException {
index(testData);
//
// random WHERE and ORDER BY queries
//
int randomWhereExecutions = randomIntBetween(1, 15);
int clientTypeTotalQueries = baseClientTypeTotalQueries + randomWhereExecutions;
int allTotalQueries = baseAllTotalQueries + randomWhereExecutions;
for (int i = 0; i < randomWhereExecutions; i++) {
runSql("SELECT name FROM library WHERE page_count > 100 ORDER BY page_count");
}
Map<String, Object> responseAsMap = getStats();
assertFeatureMetric(baseMetrics.get("where") + randomWhereExecutions, responseAsMap, "where");
assertFeatureMetric(baseMetrics.get("orderby") + randomWhereExecutions, responseAsMap, "orderby");
assertClientTypeAndAllQueryMetrics(clientTypeTotalQueries, allTotalQueries, responseAsMap);
//
// random HAVING and GROUP BY queries
//
int randomHavingExecutions = randomIntBetween(1, 15);
clientTypeTotalQueries += randomHavingExecutions;
allTotalQueries += randomHavingExecutions;
for (int i = 0; i < randomHavingExecutions; i++) {
runSql("SELECT condition FROM library GROUP BY condition HAVING MAX(page_count) > 1000");
}
responseAsMap = getStats();
assertFeatureMetric(baseMetrics.get("having") + randomHavingExecutions, responseAsMap, "having");
assertFeatureMetric(baseMetrics.get("groupby") + randomHavingExecutions, responseAsMap, "groupby");
assertClientTypeAndAllQueryMetrics(clientTypeTotalQueries, allTotalQueries, responseAsMap);
//
// random LIMIT queries
//
int randomLimitExecutions = randomIntBetween(1, 15);
clientTypeTotalQueries += randomLimitExecutions;
allTotalQueries += randomLimitExecutions;
for (int i = 0; i < randomLimitExecutions; i++) {
runSql("SELECT * FROM library LIMIT " + testData.size());
}
responseAsMap = getStats();
assertFeatureMetric(baseMetrics.get("limit") + randomLimitExecutions, responseAsMap, "limit");
assertClientTypeAndAllQueryMetrics(clientTypeTotalQueries, allTotalQueries, responseAsMap);
//
// random LOCALly executed queries
//
int randomLocalExecutions = randomIntBetween(1, 15);
clientTypeTotalQueries += randomLocalExecutions;
allTotalQueries += randomLocalExecutions;
for (int i = 0; i < randomLocalExecutions; i++) {
runSql("SELECT 1+2");
}
responseAsMap = getStats();
assertFeatureMetric(baseMetrics.get("local") + randomLocalExecutions, responseAsMap, "local");
assertClientTypeAndAllQueryMetrics(clientTypeTotalQueries, allTotalQueries, responseAsMap);
//
// random COMMANDs
//
int randomCommandExecutions = randomIntBetween(1, 15);
clientTypeTotalQueries += randomCommandExecutions;
allTotalQueries += randomCommandExecutions;
for (int i = 0; i < randomCommandExecutions; i++) {
runSql(randomFrom("SHOW FUNCTIONS", "SHOW COLUMNS FROM library", "SHOW SCHEMAS",
"SHOW TABLES", "SYS CATALOGS", "SYS COLUMNS LIKE '%name'",
"SYS TABLES", "SYS TYPES"));
}
responseAsMap = getStats();
assertFeatureMetric(baseMetrics.get("command") + randomCommandExecutions, responseAsMap, "command");
assertClientTypeAndAllQueryMetrics(clientTypeTotalQueries, allTotalQueries, responseAsMap);
//
// random TRANSLATE requests
//
int randomTranslateExecutions = randomIntBetween(1, 15);
for (int i = 0; i < randomTranslateExecutions; i++) {
runTranslate("SELECT name FROM library WHERE page_count > 100 ORDER BY page_count");
}
responseAsMap = getStats();
assertTranslateQueryMetric(baseTranslateRequests + randomTranslateExecutions, responseAsMap);
//
// random failed queries
//
int randomFailedExecutions = randomIntBetween(1, 15);
int clientTypeFailedQueries = baseClientTypeFailedQueries + randomFailedExecutions;
int allFailedQueries = baseAllFailedQueries + randomFailedExecutions;
allTotalQueries += randomFailedExecutions;
clientTypeTotalQueries += randomFailedExecutions;
for (int i = 0; i < randomFailedExecutions; i++) {
// not interested in the exception type, but in the fact that the metrics are incremented
// when an exception is thrown
expectThrows(Exception.class, () -> {
runSql(randomFrom("SELECT missing_field FROM library",
"SELECT * FROM missing_index",
"SELECTT wrong_command",
"SELECT name + page_count AS not_allowed FROM library",
"SELECT incomplete_query FROM"));
});
}
responseAsMap = getStats();
assertClientTypeAndAllFailedQueryMetrics(clientTypeFailedQueries, allFailedQueries, responseAsMap);
assertClientTypeAndAllQueryMetrics(clientTypeTotalQueries, allTotalQueries, responseAsMap);
}
private void assertClientTypeAndAllQueryMetrics(int clientTypeTotalQueries, int allTotalQueries, Map<String, Object> responseAsMap)
throws IOException {
assertClientTypeQueryMetric(clientTypeTotalQueries, responseAsMap, "total");
assertAllQueryMetric(allTotalQueries, responseAsMap, "total");
}
private void assertClientTypeAndAllFailedQueryMetrics(int clientTypeFailedQueries, int allFailedQueries,
Map<String, Object> responseAsMap) throws IOException {
assertClientTypeQueryMetric(clientTypeFailedQueries, responseAsMap, "failed");
assertAllQueryMetric(allFailedQueries, responseAsMap, "failed");
}
private void index(List<IndexDocument> docs) throws IOException {
Request request = new Request("POST", "/library/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulk = new StringBuilder();
for (IndexDocument doc : docs) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"condition\":\"" + doc.condition + "\",\"name\":\"" + doc.name + "\",\"page_count\":" + doc.pageCount + "}\n");
}
request.setJsonEntity(bulk.toString());
client().performRequest(request);
}
private Map<String, Object> getStats() throws UnsupportedOperationException, IOException {
Request request = new Request("GET", "/_xpack/sql/stats");
Map<String, Object> responseAsMap;
try (InputStream content = client().performRequest(request).getEntity().getContent()) {
responseAsMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
return responseAsMap;
}
private void runTranslate(String sql) throws IOException {
Request request = new Request("POST", "/_xpack/sql/translate");
if (randomBoolean()) {
// We default to JSON but we force it randomly for extra coverage
request.addParameter("format", "json");
}
if (randomBoolean()) {
// JSON is the default but randomly set it sometime for extra coverage
RequestOptions.Builder options = request.getOptions().toBuilder();
options.addHeader("Accept", randomFrom("*/*", "application/json"));
request.setOptions(options);
}
request.setEntity(new StringEntity("{\"query\":\"" + sql + "\"}", ContentType.APPLICATION_JSON));
client().performRequest(request);
}
private void runSql(String sql) throws IOException {
String mode = (clientType.equals(ClientType.JDBC.toString()) || clientType.equals(ClientType.ODBC.toString())) ?
clientType.toString() : Mode.PLAIN.toString();
runSql(mode, clientType, sql);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void assertTranslateQueryMetric(int expected, Map<String, Object> responseAsMap) throws IOException {
List<Map<String, Map<String, Map>>> nodesListStats = (List) responseAsMap.get("stats");
int actualMetricValue = 0;
for (Map perNodeStats : nodesListStats) {
Map queriesMetrics = (Map) ((Map) perNodeStats.get("stats")).get("queries");
Map perTypeQueriesMetrics = (Map) queriesMetrics.get("translate");
actualMetricValue += (int) perTypeQueriesMetrics.get("count");
}
assertEquals(expected, actualMetricValue);
}
private void runSql(String mode, String restClient, String sql) throws IOException {
Request request = new Request("POST", "/_xpack/sql");
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
request.addParameter("pretty", "true"); // Improves error reporting readability
if (randomBoolean()) {
// We default to JSON but we force it randomly for extra coverage
request.addParameter("format", "json");
}
if (false == mode.isEmpty()) {
request.addParameter("mode", mode); // JDBC or PLAIN mode
}
// randomly use the "client.id" parameter or not
if (false == ignoreClientType) {
request.addParameter("client.id", restClient);
}
if (randomBoolean()) {
// JSON is the default but randomly set it sometime for extra coverage
RequestOptions.Builder options = request.getOptions().toBuilder();
options.addHeader("Accept", randomFrom("*/*", "application/json"));
request.setOptions(options);
}
request.setEntity(new StringEntity("{\"query\":\"" + sql + "\"}", ContentType.APPLICATION_JSON));
client().performRequest(request);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void assertFeatureMetric(int expected, Map<String, Object> responseAsMap, String feature) throws IOException {
List<Map<String, ?>> nodesListStats = (List<Map<String, ?>>) responseAsMap.get("stats");
int actualMetricValue = 0;
for (Map perNodeStats : nodesListStats) {
Map featuresMetrics = (Map) ((Map) perNodeStats.get("stats")).get("features");
actualMetricValue += (int) featuresMetrics.get(feature);
}
assertEquals(expected, actualMetricValue);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void assertQueryMetric(int expected, Map<String, Object> responseAsMap, String queryType, String metric) throws IOException {
List<Map<String, Map<String, Map>>> nodesListStats = (List) responseAsMap.get("stats");
int actualMetricValue = 0;
for (Map perNodeStats : nodesListStats) {
Map queriesMetrics = (Map) ((Map) perNodeStats.get("stats")).get("queries");
Map perTypeQueriesMetrics = (Map) queriesMetrics.get(queryType);
actualMetricValue += (int) perTypeQueriesMetrics.get(metric);
}
assertEquals(expected, actualMetricValue);
}
private void assertClientTypeQueryMetric(int expected, Map<String, Object> responseAsMap, String metric) throws IOException {
assertQueryMetric(expected, responseAsMap, clientType, metric);
}
private void assertAllQueryMetric(int expected, Map<String, Object> responseAsMap, String metric) throws IOException {
assertQueryMetric(expected, responseAsMap, "_all", metric);
}
private class IndexDocument {
private String condition;
private String name;
private int pageCount;
IndexDocument(String condition, String name, int pageCount) {
this.condition = condition;
this.name = name;
this.pageCount = pageCount;
}
}
}

View File

@ -15,8 +15,8 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.Protocol;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
import java.io.IOException;
@ -44,9 +44,9 @@ public abstract class AbstractSqlQueryRequest extends AbstractSqlRequest impleme
super();
}
public AbstractSqlQueryRequest(Mode mode, String query, List<SqlTypedParamValue> params, QueryBuilder filter, TimeZone timeZone,
int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout) {
super(mode);
public AbstractSqlQueryRequest(String query, List<SqlTypedParamValue> params, QueryBuilder filter, TimeZone timeZone,
int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout, RequestInfo requestInfo) {
super(requestInfo);
this.query = query;
this.params = params;
this.timeZone = timeZone;

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
import java.io.IOException;
import java.util.Objects;
@ -24,25 +25,27 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
*/
public abstract class AbstractSqlRequest extends ActionRequest implements ToXContent {
private Mode mode = Mode.PLAIN;
private RequestInfo requestInfo;
protected AbstractSqlRequest() {
}
protected AbstractSqlRequest(Mode mode) {
this.mode = mode;
protected AbstractSqlRequest(RequestInfo requestInfo) {
this.requestInfo = requestInfo;
}
protected AbstractSqlRequest(StreamInput in) throws IOException {
super(in);
mode = in.readEnum(Mode.class);
Mode mode = in.readEnum(Mode.class);
String clientId = in.readOptionalString();
requestInfo = new RequestInfo(mode, clientId);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (mode == null) {
if (requestInfo == null || requestInfo.mode() == null) {
validationException = addValidationError("[mode] is required", validationException);
}
return validationException;
@ -56,19 +59,36 @@ public abstract class AbstractSqlRequest extends ActionRequest implements ToXCon
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeEnum(mode);
out.writeEnum(requestInfo.mode());
out.writeOptionalString(requestInfo.clientId());
}
public RequestInfo requestInfo() {
return requestInfo;
}
public void requestInfo(RequestInfo requestInfo) {
this.requestInfo = requestInfo;
}
public Mode mode() {
return mode;
return requestInfo.mode();
}
public void mode(Mode mode) {
this.mode = mode;
this.requestInfo.mode(mode);
}
public void mode(String mode) {
this.mode = Mode.fromString(mode);
this.requestInfo.mode(Mode.fromString(mode));
}
public String clientId() {
return requestInfo.clientId();
}
public void clientId(String clientId) {
this.requestInfo.clientId(clientId);
}
@Override
@ -76,12 +96,12 @@ public abstract class AbstractSqlRequest extends ActionRequest implements ToXCon
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AbstractSqlRequest that = (AbstractSqlRequest) o;
return mode == that.mode;
return Objects.equals(requestInfo, that.requestInfo);
}
@Override
public int hashCode() {
return Objects.hash(mode);
return requestInfo.hashCode();
}
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
import java.io.IOException;
import java.util.Objects;
@ -40,9 +41,9 @@ public class SqlClearCursorRequest extends AbstractSqlRequest {
public SqlClearCursorRequest() {
}
public SqlClearCursorRequest(Mode mode, String cursor) {
super(mode);
super(new RequestInfo(mode));
this.cursor = cursor;
}
@ -97,7 +98,7 @@ public class SqlClearCursorRequest extends AbstractSqlRequest {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// This is needed just to test round-trip compatibility with proto.SqlClearCursorRequest
return new org.elasticsearch.xpack.sql.proto.SqlClearCursorRequest(mode(), cursor).toXContent(builder, params);
return new org.elasticsearch.xpack.sql.proto.SqlClearCursorRequest(cursor, requestInfo()).toXContent(builder, params);
}
public static SqlClearCursorRequest fromXContent(XContentParser parser, Mode mode) {

View File

@ -16,7 +16,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
import java.io.IOException;
@ -46,9 +46,9 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
public SqlQueryRequest() {
}
public SqlQueryRequest(Mode mode, String query, List<SqlTypedParamValue> params, QueryBuilder filter, TimeZone timeZone,
int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout, String cursor) {
super(mode, query, params, filter, timeZone, fetchSize, requestTimeout, pageTimeout);
public SqlQueryRequest(String query, List<SqlTypedParamValue> params, QueryBuilder filter, TimeZone timeZone,
int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout, String cursor, RequestInfo requestInfo) {
super(query, params, filter, timeZone, fetchSize, requestTimeout, pageTimeout, requestInfo);
this.cursor = cursor;
}
@ -110,13 +110,13 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// This is needed just to test round-trip compatibility with proto.SqlQueryRequest
return new org.elasticsearch.xpack.sql.proto.SqlQueryRequest(mode(), query(), params(), timeZone(), fetchSize(),
requestTimeout(), pageTimeout(), filter(), cursor()).toXContent(builder, params);
return new org.elasticsearch.xpack.sql.proto.SqlQueryRequest(query(), params(), timeZone(), fetchSize(), requestTimeout(),
pageTimeout(), filter(), cursor(), requestInfo()).toXContent(builder, params);
}
public static SqlQueryRequest fromXContent(XContentParser parser, Mode mode) {
public static SqlQueryRequest fromXContent(XContentParser parser, RequestInfo requestInfo) {
SqlQueryRequest request = PARSER.apply(parser, null);
request.mode(mode);
request.requestInfo(requestInfo);
return request;
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.Protocol;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
import java.util.Collections;
@ -24,14 +25,14 @@ public class SqlQueryRequestBuilder extends ActionRequestBuilder<SqlQueryRequest
public SqlQueryRequestBuilder(ElasticsearchClient client, SqlQueryAction action) {
this(client, action, "", Collections.emptyList(), null, Protocol.TIME_ZONE, Protocol.FETCH_SIZE, Protocol.REQUEST_TIMEOUT,
Protocol.PAGE_TIMEOUT, "", Mode.PLAIN);
Protocol.PAGE_TIMEOUT, "", new RequestInfo(Mode.PLAIN));
}
public SqlQueryRequestBuilder(ElasticsearchClient client, SqlQueryAction action, String query, List<SqlTypedParamValue> params,
QueryBuilder filter, TimeZone timeZone, int fetchSize, TimeValue requestTimeout,
TimeValue pageTimeout, String nextPageInfo, Mode mode) {
super(client, action, new SqlQueryRequest(mode, query, params, filter, timeZone, fetchSize, requestTimeout, pageTimeout,
nextPageInfo));
TimeValue pageTimeout, String nextPageInfo, RequestInfo requestInfo) {
super(client, action, new SqlQueryRequest(query, params, filter, timeZone, fetchSize, requestTimeout, pageTimeout, nextPageInfo,
requestInfo));
}
public SqlQueryRequestBuilder query(String query) {

View File

@ -14,8 +14,9 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
import org.elasticsearch.xpack.sql.proto.SqlQueryRequest;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
import java.io.IOException;
import java.util.List;
@ -32,9 +33,9 @@ public class SqlTranslateRequest extends AbstractSqlQueryRequest {
public SqlTranslateRequest() {
}
public SqlTranslateRequest(Mode mode, String query, List<SqlTypedParamValue> params, QueryBuilder filter, TimeZone timeZone,
int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout) {
super(mode, query, params, filter, timeZone, fetchSize, requestTimeout, pageTimeout);
public SqlTranslateRequest(String query, List<SqlTypedParamValue> params, QueryBuilder filter, TimeZone timeZone,
int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout, RequestInfo requestInfo) {
super(query, params, filter, timeZone, fetchSize, requestTimeout, pageTimeout, requestInfo);
}
public SqlTranslateRequest(StreamInput in) throws IOException {
@ -57,17 +58,15 @@ public class SqlTranslateRequest extends AbstractSqlQueryRequest {
public static SqlTranslateRequest fromXContent(XContentParser parser, Mode mode) {
SqlTranslateRequest request = PARSER.apply(parser, null);
request.mode(mode);
request.requestInfo(new RequestInfo(mode));
return request;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// This is needed just to test parsing of SqlTranslateRequest, so we can reuse SqlQuerySerialization
return new SqlQueryRequest(mode(), query(), params(), timeZone(), fetchSize(),
requestTimeout(), pageTimeout(), filter(), null).toXContent(builder, params);
return new SqlQueryRequest(query(), params(), timeZone(), fetchSize(), requestTimeout(),
pageTimeout(), filter(), null, requestInfo()).toXContent(builder, params);
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.Protocol;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
import java.util.Collections;
@ -22,14 +23,15 @@ import java.util.TimeZone;
*/
public class SqlTranslateRequestBuilder extends ActionRequestBuilder<SqlTranslateRequest, SqlTranslateResponse> {
public SqlTranslateRequestBuilder(ElasticsearchClient client, SqlTranslateAction action) {
this(client, action, Mode.PLAIN, null, null, Collections.emptyList(), Protocol.TIME_ZONE, Protocol.FETCH_SIZE,
Protocol.REQUEST_TIMEOUT, Protocol.PAGE_TIMEOUT);
this(client, action, null, null, Collections.emptyList(), Protocol.TIME_ZONE, Protocol.FETCH_SIZE, Protocol.REQUEST_TIMEOUT,
Protocol.PAGE_TIMEOUT, new RequestInfo(Mode.PLAIN));
}
public SqlTranslateRequestBuilder(ElasticsearchClient client, SqlTranslateAction action, Mode mode, String query,
QueryBuilder filter, List<SqlTypedParamValue> params, TimeZone timeZone, int fetchSize,
TimeValue requestTimeout, TimeValue pageTimeout) {
super(client, action, new SqlTranslateRequest(mode, query, params, filter, timeZone, fetchSize, requestTimeout, pageTimeout));
public SqlTranslateRequestBuilder(ElasticsearchClient client, SqlTranslateAction action, String query, QueryBuilder filter,
List<SqlTypedParamValue> params, TimeZone timeZone, int fetchSize, TimeValue requestTimeout,
TimeValue pageTimeout, RequestInfo requestInfo) {
super(client, action,
new SqlTranslateRequest(query, params, filter, timeZone, fetchSize, requestTimeout, pageTimeout, requestInfo));
}
public SqlTranslateRequestBuilder query(String query) {

View File

@ -15,6 +15,7 @@ import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
import org.junit.Before;
@ -26,14 +27,18 @@ import java.util.function.Supplier;
import static org.elasticsearch.xpack.sql.action.SqlTestUtils.randomFilter;
import static org.elasticsearch.xpack.sql.action.SqlTestUtils.randomFilterOrNull;
import static org.elasticsearch.xpack.sql.proto.RequestInfo.CLI;
import static org.elasticsearch.xpack.sql.proto.RequestInfo.CANVAS;
public class SqlQueryRequestTests extends AbstractSerializingTestCase<SqlQueryRequest> {
public Mode testMode;
public RequestInfo requestInfo;
public String clientId;
@Before
public void setup() {
testMode = randomFrom(Mode.values());
clientId = randomFrom(CLI, CANVAS, randomAlphaOfLengthBetween(10, 20));
requestInfo = new RequestInfo(randomFrom(Mode.values()), clientId);
}
@Override
@ -50,11 +55,15 @@ public class SqlQueryRequestTests extends AbstractSerializingTestCase<SqlQueryRe
@Override
protected SqlQueryRequest createTestInstance() {
return new SqlQueryRequest(testMode, randomAlphaOfLength(10), randomParameters(),
SqlTestUtils.randomFilterOrNull(random()), randomTimeZone(),
between(1, Integer.MAX_VALUE), randomTV(), randomTV(), randomAlphaOfLength(10)
return new SqlQueryRequest(randomAlphaOfLength(10), randomParameters(), SqlTestUtils.randomFilterOrNull(random()),
randomTimeZone(), between(1, Integer.MAX_VALUE),
randomTV(), randomTV(), randomAlphaOfLength(10), requestInfo
);
}
private RequestInfo randomRequestInfo() {
return new RequestInfo(randomFrom(Mode.values()), randomFrom(CLI, CANVAS, clientId));
}
public List<SqlTypedParamValue> randomParameters() {
if (randomBoolean()) {
@ -87,14 +96,14 @@ public class SqlQueryRequestTests extends AbstractSerializingTestCase<SqlQueryRe
@Override
protected SqlQueryRequest doParseInstance(XContentParser parser) {
return SqlQueryRequest.fromXContent(parser, testMode);
return SqlQueryRequest.fromXContent(parser, requestInfo);
}
@Override
protected SqlQueryRequest mutateInstance(SqlQueryRequest instance) {
@SuppressWarnings("unchecked")
Consumer<SqlQueryRequest> mutator = randomFrom(
request -> request.mode(randomValueOtherThan(request.mode(), () -> randomFrom(Mode.values()))),
request -> request.requestInfo(randomValueOtherThan(request.requestInfo(), this::randomRequestInfo)),
request -> request.query(randomValueOtherThan(request.query(), () -> randomAlphaOfLength(5))),
request -> request.params(randomValueOtherThan(request.params(), this::randomParameters)),
request -> request.timeZone(randomValueOtherThan(request.timeZone(), ESTestCase::randomTimeZone)),
@ -104,8 +113,9 @@ public class SqlQueryRequestTests extends AbstractSerializingTestCase<SqlQueryRe
() -> request.filter() == null ? randomFilter(random()) : randomFilterOrNull(random()))),
request -> request.cursor(randomValueOtherThan(request.cursor(), SqlQueryResponseTests::randomStringCursor))
);
SqlQueryRequest newRequest = new SqlQueryRequest(instance.mode(), instance.query(), instance.params(), instance.filter(),
instance.timeZone(), instance.fetchSize(), instance.requestTimeout(), instance.pageTimeout(), instance.cursor());
SqlQueryRequest newRequest = new SqlQueryRequest(instance.query(), instance.params(), instance.filter(),
instance.timeZone(), instance.fetchSize(), instance.requestTimeout(), instance.pageTimeout(), instance.cursor(),
instance.requestInfo());
mutator.accept(newRequest);
return newRequest;
}

View File

@ -15,6 +15,7 @@ import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
import org.junit.Before;
import java.io.IOException;
@ -35,8 +36,8 @@ public class SqlTranslateRequestTests extends AbstractSerializingTestCase<SqlTra
@Override
protected SqlTranslateRequest createTestInstance() {
return new SqlTranslateRequest(testMode, randomAlphaOfLength(10), Collections.emptyList(), randomFilterOrNull(random()),
randomTimeZone(), between(1, Integer.MAX_VALUE), randomTV(), randomTV());
return new SqlTranslateRequest(randomAlphaOfLength(10), Collections.emptyList(), randomFilterOrNull(random()),
randomTimeZone(), between(1, Integer.MAX_VALUE), randomTV(), randomTV(), new RequestInfo(testMode));
}
@Override
@ -76,8 +77,8 @@ public class SqlTranslateRequestTests extends AbstractSerializingTestCase<SqlTra
request -> request.filter(randomValueOtherThan(request.filter(),
() -> request.filter() == null ? randomFilter(random()) : randomFilterOrNull(random())))
);
SqlTranslateRequest newRequest = new SqlTranslateRequest(instance.mode(), instance.query(), instance.params(), instance.filter(),
instance.timeZone(), instance.fetchSize(), instance.requestTimeout(), instance.pageTimeout());
SqlTranslateRequest newRequest = new SqlTranslateRequest(instance.query(), instance.params(), instance.filter(),
instance.timeZone(), instance.fetchSize(), instance.requestTimeout(), instance.pageTimeout(), instance.requestInfo());
mutator.accept(newRequest);
return newRequest;
}

View File

@ -5,10 +5,10 @@
*/
package org.elasticsearch.xpack.sql.cli.command;
import org.elasticsearch.xpack.sql.action.CliFormatter;
import org.elasticsearch.xpack.sql.cli.CliTerminal;
import org.elasticsearch.xpack.sql.client.HttpClient;
import org.elasticsearch.xpack.sql.client.JreHttpUrlConnection;
import org.elasticsearch.xpack.sql.action.CliFormatter;
import org.elasticsearch.xpack.sql.proto.SqlQueryResponse;
import java.sql.SQLException;

View File

@ -49,6 +49,8 @@ public class ConnectionConfiguration {
public static final String PAGE_SIZE = "page.size";
private static final String PAGE_SIZE_DEFAULT = "1000";
public static final String CLIENT_ID = "client.id";
// Auth

View File

@ -23,6 +23,7 @@ import org.elasticsearch.xpack.sql.proto.SqlClearCursorRequest;
import org.elasticsearch.xpack.sql.proto.SqlClearCursorResponse;
import org.elasticsearch.xpack.sql.proto.SqlQueryRequest;
import org.elasticsearch.xpack.sql.proto.SqlQueryResponse;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -35,6 +36,9 @@ import java.util.Collections;
import java.util.TimeZone;
import java.util.function.Function;
import static org.elasticsearch.xpack.sql.proto.RequestInfo.CLI;
import static org.elasticsearch.xpack.sql.client.ConnectionConfiguration.CLIENT_ID;
/**
* A specialized high-level REST client with support for SQL-related functions.
* Similar to JDBC and the underlying HTTP connection, this class is not thread-safe
@ -62,9 +66,10 @@ public class HttpClient {
public SqlQueryResponse queryInit(String query, int fetchSize) throws SQLException {
// TODO allow customizing the time zone - this is what session set/reset/get should be about
SqlQueryRequest sqlRequest = new SqlQueryRequest(Mode.PLAIN, query, Collections.emptyList(), null,
TimeZone.getTimeZone("UTC"), fetchSize, TimeValue.timeValueMillis(cfg.queryTimeout()),
TimeValue.timeValueMillis(cfg.pageTimeout()));
// method called only from CLI. "client.id" is set to "cli"
SqlQueryRequest sqlRequest = new SqlQueryRequest(query, Collections.emptyList(), null, TimeZone.getTimeZone("UTC"),
fetchSize, TimeValue.timeValueMillis(cfg.queryTimeout()), TimeValue.timeValueMillis(cfg.pageTimeout()),
new RequestInfo(Mode.PLAIN, CLI));
return query(sqlRequest);
}
@ -73,14 +78,15 @@ public class HttpClient {
}
public SqlQueryResponse nextPage(String cursor) throws SQLException {
SqlQueryRequest sqlRequest = new SqlQueryRequest(Mode.PLAIN, cursor, TimeValue.timeValueMillis(cfg.queryTimeout()),
TimeValue.timeValueMillis(cfg.pageTimeout()));
// method called only from CLI. "client.id" is set to "cli"
SqlQueryRequest sqlRequest = new SqlQueryRequest(cursor, TimeValue.timeValueMillis(cfg.queryTimeout()),
TimeValue.timeValueMillis(cfg.pageTimeout()), new RequestInfo(Mode.PLAIN, CLI));
return post(Protocol.SQL_QUERY_REST_ENDPOINT, sqlRequest, SqlQueryResponse::fromXContent);
}
public boolean queryClose(String cursor) throws SQLException {
SqlClearCursorResponse response = post(Protocol.CLEAR_CURSOR_REST_ENDPOINT,
new SqlClearCursorRequest(Mode.PLAIN, cursor),
new SqlClearCursorRequest(cursor, new RequestInfo(Mode.PLAIN)),
SqlClearCursorResponse::fromXContent);
return response.isSucceeded();
}
@ -89,7 +95,9 @@ public class HttpClient {
CheckedFunction<XContentParser, Response, IOException> responseParser)
throws SQLException {
byte[] requestBytes = toXContent(request);
String query = "error_trace&mode=" + request.mode();
String query = "error_trace&mode=" +
request.mode() +
(request.clientId() != null ? "&" + CLIENT_ID + "=" + request.clientId() : "");
Tuple<XContentType, byte[]> response =
AccessController.doPrivileged((PrivilegedAction<ResponseOrException<Tuple<XContentType, byte[]>>>) () ->
JreHttpUrlConnection.http(path, query, cfg, con ->

View File

@ -16,14 +16,22 @@ import java.util.Objects;
*/
public abstract class AbstractSqlRequest implements ToXContentFragment {
private final Mode mode;
private final RequestInfo requestInfo;
protected AbstractSqlRequest(Mode mode) {
this.mode = mode;
protected AbstractSqlRequest(RequestInfo requestInfo) {
this.requestInfo = requestInfo;
}
public RequestInfo requestInfo() {
return requestInfo;
}
public Mode mode() {
return mode;
return requestInfo.mode();
}
public String clientId() {
return requestInfo.clientId();
}
@Override
@ -31,12 +39,12 @@ public abstract class AbstractSqlRequest implements ToXContentFragment {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AbstractSqlRequest that = (AbstractSqlRequest) o;
return mode == that.mode;
return Objects.equals(requestInfo, that.requestInfo);
}
@Override
public int hashCode() {
return Objects.hash(mode);
return requestInfo.hashCode();
}
}

View File

@ -28,4 +28,5 @@ public final class Protocol {
*/
public static final String CLEAR_CURSOR_REST_ENDPOINT = "/_xpack/sql/close";
public static final String SQL_QUERY_REST_ENDPOINT = "/_xpack/sql";
public static final String SQL_STATS_REST_ENDPOINT = "/_xpack/sql/stats";
}

View File

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.proto;
import java.util.Objects;
public class RequestInfo {
public static final String CLI = "cli";
public static final String CANVAS = "canvas";
private Mode mode;
private String clientId;
public RequestInfo(Mode mode) {
this(mode, null);
}
public RequestInfo(Mode mode, String clientId) {
this.mode = mode;
this.clientId = clientId;
}
public Mode mode() {
return mode;
}
public void mode(Mode mode) {
this.mode = mode;
}
public String clientId() {
return clientId;
}
public void clientId(String clientId) {
this.clientId = clientId;
}
@Override
public int hashCode() {
return Objects.hash(mode, clientId);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RequestInfo that = (RequestInfo) o;
return Objects.equals(mode, that.mode) && Objects.equals(clientId, that.clientId);
}
}

View File

@ -17,8 +17,8 @@ public class SqlClearCursorRequest extends AbstractSqlRequest {
private final String cursor;
public SqlClearCursorRequest(Mode mode, String cursor) {
super(mode);
public SqlClearCursorRequest(String cursor, RequestInfo requestInfo) {
super(requestInfo);
this.cursor = cursor;
}

View File

@ -33,9 +33,9 @@ public class SqlQueryRequest extends AbstractSqlRequest {
private final List<SqlTypedParamValue> params;
public SqlQueryRequest(Mode mode, String query, List<SqlTypedParamValue> params, TimeZone timeZone,
int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout, ToXContent filter, String cursor) {
super(mode);
public SqlQueryRequest(String query, List<SqlTypedParamValue> params, TimeZone timeZone, int fetchSize,
TimeValue requestTimeout, TimeValue pageTimeout, ToXContent filter, String cursor, RequestInfo requestInfo) {
super(requestInfo);
this.query = query;
this.params = params;
this.timeZone = timeZone;
@ -46,16 +46,16 @@ public class SqlQueryRequest extends AbstractSqlRequest {
this.cursor = cursor;
}
public SqlQueryRequest(Mode mode, String query, List<SqlTypedParamValue> params, ToXContent filter, TimeZone timeZone,
int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout) {
this(mode, query, params, timeZone, fetchSize, requestTimeout, pageTimeout, filter, null);
public SqlQueryRequest(String query, List<SqlTypedParamValue> params, ToXContent filter, TimeZone timeZone,
int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout, RequestInfo requestInfo) {
this(query, params, timeZone, fetchSize, requestTimeout, pageTimeout, filter, null, requestInfo);
}
public SqlQueryRequest(Mode mode, String cursor, TimeValue requestTimeout, TimeValue pageTimeout) {
this(mode, "", Collections.emptyList(), Protocol.TIME_ZONE, Protocol.FETCH_SIZE, requestTimeout, pageTimeout, null, cursor);
public SqlQueryRequest(String cursor, TimeValue requestTimeout, TimeValue pageTimeout, RequestInfo requestInfo) {
this("", Collections.emptyList(), Protocol.TIME_ZONE, Protocol.FETCH_SIZE, requestTimeout, pageTimeout,
null, cursor, requestInfo);
}
/**
* The key that must be sent back to SQL to access the next page of
* results.

View File

@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.sql.SqlFeatureSetUsage;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.sql.plugin.SqlStatsAction;
import org.elasticsearch.xpack.sql.plugin.SqlStatsRequest;
import org.elasticsearch.xpack.sql.plugin.SqlStatsResponse;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
public class SqlFeatureSet implements XPackFeatureSet {
private final boolean enabled;
private final XPackLicenseState licenseState;
private Client client;
@Inject
public SqlFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState, Client client) {
this.enabled = XPackSettings.SQL_ENABLED.get(settings);
this.licenseState = licenseState;
this.client = client;
}
@Override
public String name() {
return XPackField.SQL;
}
@Override
public String description() {
return "SQL access to Elasticsearch";
}
@Override
public boolean available() {
return licenseState != null && licenseState.isSqlAllowed();
}
@Override
public boolean enabled() {
return enabled;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return null;
}
@Override
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
if (enabled) {
SqlStatsRequest request = new SqlStatsRequest();
request.includeStats(true);
client.execute(SqlStatsAction.INSTANCE, request, ActionListener.wrap(r -> {
List<Counters> countersPerNode = r.getNodes()
.stream()
.map(SqlStatsResponse.NodeStatsResponse::getStats)
.filter(Objects::nonNull)
.collect(Collectors.toList());
Counters mergedCounters = Counters.merge(countersPerNode);
listener.onResponse(new SqlFeatureSetUsage(available(), enabled(), mergedCounters.toNestedMap()));
}, listener::onFailure));
} else {
listener.onResponse(new SqlFeatureSetUsage(available(), enabled(), Collections.emptyMap()));
}
}
}

View File

@ -45,7 +45,6 @@ import org.elasticsearch.xpack.sql.plan.logical.UnresolvedRelation;
import org.elasticsearch.xpack.sql.plan.logical.With;
import org.elasticsearch.xpack.sql.rule.Rule;
import org.elasticsearch.xpack.sql.rule.RuleExecutor;
import org.elasticsearch.xpack.sql.tree.Node;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.DataTypeConversion;
import org.elasticsearch.xpack.sql.type.DataTypes;
@ -66,18 +65,9 @@ import java.util.TimeZone;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine;
public class Analyzer extends RuleExecutor<LogicalPlan> {
/**
* Verify a plan.
*/
public static Map<Node<?>, String> verifyFailures(LogicalPlan plan) {
Collection<Failure> failures = Verifier.verify(plan);
return failures.stream().collect(toMap(Failure::source, Failure::message));
}
/**
* Valid functions.
*/
@ -91,11 +81,16 @@ public class Analyzer extends RuleExecutor<LogicalPlan> {
* that deal with date and time.
*/
private final TimeZone timeZone;
/**
* The verifier has the role of checking the analyzed tree for failures and build a list of failures.
*/
private final Verifier verifier;
public Analyzer(FunctionRegistry functionRegistry, IndexResolution results, TimeZone timeZone) {
public Analyzer(FunctionRegistry functionRegistry, IndexResolution results, TimeZone timeZone, Verifier verifier) {
this.functionRegistry = functionRegistry;
this.indexResolution = results;
this.timeZone = timeZone;
this.verifier = verifier;
}
@Override
@ -132,7 +127,7 @@ public class Analyzer extends RuleExecutor<LogicalPlan> {
}
public LogicalPlan verify(LogicalPlan plan) {
Collection<Failure> failures = Verifier.verify(plan);
Collection<Failure> failures = verifier.verify(plan);
if (!failures.isEmpty()) {
throw new VerificationException(failures);
}
@ -239,7 +234,6 @@ public class Analyzer extends RuleExecutor<LogicalPlan> {
return containsAggregate(singletonList(exp));
}
private static class CTESubstitution extends AnalyzeRule<With> {
@Override

View File

@ -23,14 +23,20 @@ import org.elasticsearch.xpack.sql.expression.predicate.operator.comparison.In;
import org.elasticsearch.xpack.sql.plan.logical.Aggregate;
import org.elasticsearch.xpack.sql.plan.logical.Distinct;
import org.elasticsearch.xpack.sql.plan.logical.Filter;
import org.elasticsearch.xpack.sql.plan.logical.Limit;
import org.elasticsearch.xpack.sql.plan.logical.LocalRelation;
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.sql.plan.logical.OrderBy;
import org.elasticsearch.xpack.sql.plan.logical.Project;
import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.stats.FeatureMetric;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.tree.Node;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@ -42,10 +48,25 @@ import java.util.Set;
import java.util.function.Consumer;
import static java.lang.String.format;
import static java.util.stream.Collectors.toMap;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.COMMAND;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.GROUPBY;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.HAVING;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.LIMIT;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.LOCAL;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.ORDERBY;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.WHERE;
final class Verifier {
private Verifier() {}
/**
* The verifier has the role of checking the analyzed tree for failures and build a list of failures following this check.
* It is created in the plan executor along with the metrics instance passed as constructor parameter.
*/
public final class Verifier {
private final Metrics metrics;
public Verifier(Metrics metrics) {
this.metrics = metrics;
}
static class Failure {
private final Node<?> source;
@ -93,7 +114,12 @@ final class Verifier {
return new Failure(source, format(Locale.ROOT, message, args));
}
static Collection<Failure> verify(LogicalPlan plan) {
public Map<Node<?>, String> verifyFailures(LogicalPlan plan) {
Collection<Failure> failures = verify(plan);
return failures.stream().collect(toMap(Failure::source, Failure::message));
}
Collection<Failure> verify(LogicalPlan plan) {
Set<Failure> failures = new LinkedHashSet<>();
// start bottom-up
@ -103,7 +129,7 @@ final class Verifier {
return;
}
// if the children are unresolved, this node will also so counting it will only add noise
// if the children are unresolved, so will this node; counting it will only add noise
if (!p.childrenResolved()) {
return;
}
@ -212,6 +238,33 @@ final class Verifier {
failures.addAll(localFailures);
});
}
// gather metrics
if (failures.isEmpty()) {
BitSet b = new BitSet(FeatureMetric.values().length);
plan.forEachDown(p -> {
if (p instanceof Aggregate) {
b.set(GROUPBY.ordinal());
} else if (p instanceof OrderBy) {
b.set(ORDERBY.ordinal());
} else if (p instanceof Filter) {
if (((Filter) p).child() instanceof Aggregate) {
b.set(HAVING.ordinal());
} else {
b.set(WHERE.ordinal());
}
} else if (p instanceof Limit) {
b.set(LIMIT.ordinal());
} else if (p instanceof LocalRelation) {
b.set(LOCAL.ordinal());
} else if (p instanceof Command) {
b.set(COMMAND.ordinal());
}
});
for (int i = b.nextSetBit(0); i >= 0; i = b.nextSetBit(i + 1)) {
metrics.inc(FeatureMetric.values()[i]);
}
}
return failures;
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.sql.analysis.analyzer.PreAnalyzer;
import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolver;
import org.elasticsearch.xpack.sql.execution.search.SourceGenerator;
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
@ -23,6 +24,7 @@ import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.stats.Metrics;
import java.util.List;
@ -34,8 +36,11 @@ public class PlanExecutor {
private final IndexResolver indexResolver;
private final PreAnalyzer preAnalyzer;
private final Verifier verifier;
private final Optimizer optimizer;
private final Planner planner;
private final Metrics metrics;
public PlanExecutor(Client client, IndexResolver indexResolver, NamedWriteableRegistry writeableRegistry) {
this.client = client;
@ -43,14 +48,17 @@ public class PlanExecutor {
this.indexResolver = indexResolver;
this.functionRegistry = new FunctionRegistry();
this.metrics = new Metrics();
this.preAnalyzer = new PreAnalyzer();
this.verifier = new Verifier(metrics);
this.optimizer = new Optimizer();
this.planner = new Planner();
}
private SqlSession newSession(Configuration cfg) {
return new SqlSession(cfg, client, functionRegistry, indexResolver, preAnalyzer, optimizer, planner);
return new SqlSession(cfg, client, functionRegistry, indexResolver, preAnalyzer, verifier, optimizer, planner);
}
public void searchSource(Configuration cfg, String sql, List<SqlTypedParamValue> params, ActionListener<SearchSourceBuilder> listener) {
@ -75,4 +83,8 @@ public class PlanExecutor {
public void cleanCursor(Configuration cfg, Cursor cursor, ActionListener<Boolean> listener) {
cursor.clear(cfg, client, listener);
}
public Metrics metrics() {
return this.metrics;
}
}

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.sql.plan.logical.command;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.FieldAttribute;
import org.elasticsearch.xpack.sql.plan.QueryPlan;
@ -131,7 +130,7 @@ public class Explain extends Command {
// check errors manually to see how far the plans work out
else {
// no analysis failure, can move on
if (Analyzer.verifyFailures(analyzedPlan).isEmpty()) {
if (session.verifier().verifyFailures(analyzedPlan).isEmpty()) {
session.optimizedPlan(analyzedPlan, wrap(optimizedPlan -> {
if (type == Type.OPTIMIZED) {
listener.onResponse(Rows.singleton(output(), formatPlan(format, optimizedPlan)));

View File

@ -23,16 +23,21 @@ import org.elasticsearch.xpack.sql.action.SqlQueryRequest;
import org.elasticsearch.xpack.sql.action.SqlQueryResponse;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.Protocol;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.Cursors;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.xpack.sql.proto.RequestInfo.CANVAS;
import static org.elasticsearch.xpack.sql.proto.RequestInfo.CLI;
public class RestSqlQueryAction extends BaseRestHandler {
private static String CLIENT_ID = "client.id";
public RestSqlQueryAction(Settings settings, RestController controller) {
super(settings);
@ -44,7 +49,16 @@ public class RestSqlQueryAction extends BaseRestHandler {
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
SqlQueryRequest sqlRequest;
try (XContentParser parser = request.contentOrSourceParamParser()) {
sqlRequest = SqlQueryRequest.fromXContent(parser,Mode.fromString(request.param("mode")));
String clientId = request.param(CLIENT_ID);
if (clientId != null) {
clientId = clientId.toLowerCase(Locale.ROOT);
if (!clientId.equals(CLI) && !clientId.equals(CANVAS)) {
clientId = null;
}
}
sqlRequest = SqlQueryRequest.fromXContent(parser,
new RequestInfo(Mode.fromString(request.param("mode")), clientId));
}
/*

View File

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.xpack.sql.proto.Protocol;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.GET;
public class RestSqlStatsAction extends BaseRestHandler {
protected RestSqlStatsAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(GET, Protocol.SQL_STATS_REST_ENDPOINT, this);
}
@Override
public String getName() {
return "xpack_sql_stats_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
SqlStatsRequest request = new SqlStatsRequest();
return channel -> client.execute(SqlStatsAction.INSTANCE, request, new RestActions.NodesResponseRestListener<>(channel));
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
@ -31,12 +32,14 @@ import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.sql.SqlFeatureSet;
import org.elasticsearch.xpack.sql.action.SqlClearCursorAction;
import org.elasticsearch.xpack.sql.action.SqlQueryAction;
import org.elasticsearch.xpack.sql.action.SqlTranslateAction;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolver;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@ -101,6 +104,13 @@ public class SqlPlugin extends Plugin implements ActionPlugin {
return Arrays.asList(sqlLicenseChecker, indexResolver, new PlanExecutor(client, indexResolver, namedWriteableRegistry));
}
@Override
public Collection<Module> createGuiceModules() {
List<Module> modules = new ArrayList<>();
modules.add(b -> XPackPlugin.bindFeatureSet(b, SqlFeatureSet.class));
return modules;
}
@Override
public List<RestHandler> getRestHandlers(Settings settings, RestController restController,
ClusterSettings clusterSettings, IndexScopedSettings indexScopedSettings,
@ -113,7 +123,8 @@ public class SqlPlugin extends Plugin implements ActionPlugin {
return Arrays.asList(new RestSqlQueryAction(settings, restController),
new RestSqlTranslateAction(settings, restController),
new RestSqlClearCursorAction(settings, restController));
new RestSqlClearCursorAction(settings, restController),
new RestSqlStatsAction(settings, restController));
}
@Override
@ -124,6 +135,7 @@ public class SqlPlugin extends Plugin implements ActionPlugin {
return Arrays.asList(new ActionHandler<>(SqlQueryAction.INSTANCE, TransportSqlQueryAction.class),
new ActionHandler<>(SqlTranslateAction.INSTANCE, TransportSqlTranslateAction.class),
new ActionHandler<>(SqlClearCursorAction.INSTANCE, TransportSqlClearCursorAction.class));
new ActionHandler<>(SqlClearCursorAction.INSTANCE, TransportSqlClearCursorAction.class),
new ActionHandler<>(SqlStatsAction.INSTANCE, TransportSqlStatsAction.class));
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.action.Action;
public class SqlStatsAction extends Action<SqlStatsResponse> {
public static final SqlStatsAction INSTANCE = new SqlStatsAction();
public static final String NAME = "cluster:monitor/xpack/sql/stats/dist";
private SqlStatsAction() {
super(NAME);
}
@Override
public SqlStatsResponse newResponse() {
return new SqlStatsResponse();
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* Request to gather usage statistics
*/
public class SqlStatsRequest extends BaseNodesRequest<SqlStatsRequest> {
private boolean includeStats;
public SqlStatsRequest() {
}
public boolean includeStats() {
return includeStats;
}
public void includeStats(boolean includeStats) {
this.includeStats = includeStats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
includeStats = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(includeStats);
}
@Override
public String toString() {
return "sql_stats";
}
static class NodeStatsRequest extends BaseNodeRequest {
boolean includeStats;
NodeStatsRequest() {}
NodeStatsRequest(SqlStatsRequest request, String nodeId) {
super(nodeId);
includeStats = request.includeStats();
}
public boolean includeStats() {
return includeStats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
includeStats = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(includeStats);
}
}
}

View File

@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import java.io.IOException;
import java.util.List;
public class SqlStatsResponse extends BaseNodesResponse<SqlStatsResponse.NodeStatsResponse> implements ToXContentObject {
public SqlStatsResponse() {
}
public SqlStatsResponse(ClusterName clusterName, List<NodeStatsResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
protected List<NodeStatsResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeStatsResponse::readNodeResponse);
}
@Override
protected void writeNodesTo(StreamOutput out, List<NodeStatsResponse> nodes) throws IOException {
out.writeStreamableList(nodes);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray("stats");
for (NodeStatsResponse node : getNodes()) {
node.toXContent(builder, params);
}
builder.endArray();
return builder;
}
public static class NodeStatsResponse extends BaseNodeResponse implements ToXContentObject {
private Counters stats;
public NodeStatsResponse() {
}
public NodeStatsResponse(DiscoveryNode node) {
super(node);
}
public Counters getStats() {
return stats;
}
public void setStats(Counters stats) {
this.stats = stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
stats = Counters.read(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(stats != null);
if (stats != null) {
stats.writeTo(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (stats != null && stats.hasCounters()) {
builder.field("stats", stats.toNestedMap());
}
builder.endObject();
return builder;
}
static SqlStatsResponse.NodeStatsResponse readNodeResponse(StreamInput in) throws IOException {
SqlStatsResponse.NodeStatsResponse node = new SqlStatsResponse.NodeStatsResponse();
node.readFrom(in);
return node;
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursors;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.stats.QueryMetric;
import org.elasticsearch.xpack.sql.type.Schema;
import java.util.ArrayList;
@ -58,13 +59,26 @@ public class TransportSqlQueryAction extends HandledTransportAction<SqlQueryRequ
// the rest having default values (since the query is already created)
Configuration cfg = new Configuration(request.timeZone(), request.fetchSize(), request.requestTimeout(), request.pageTimeout(),
request.filter());
// mode() shouldn't be null
QueryMetric metric = QueryMetric.from(request.mode(), request.clientId());
planExecutor.metrics().total(metric);
if (Strings.hasText(request.cursor()) == false) {
planExecutor.sql(cfg, request.query(), request.params(),
ActionListener.wrap(rowSet -> listener.onResponse(createResponse(request, rowSet)), listener::onFailure));
ActionListener.wrap(rowSet -> listener.onResponse(createResponse(request, rowSet)),
e -> {
planExecutor.metrics().failed(metric);
listener.onFailure(e);
}));
} else {
planExecutor.metrics().paging(metric);
planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()),
ActionListener.wrap(rowSet -> listener.onResponse(createResponse(rowSet, null)), listener::onFailure));
ActionListener.wrap(rowSet -> listener.onResponse(createResponse(rowSet, null)),
e -> {
planExecutor.metrics().failed(metric);
listener.onFailure(e);
}));
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import java.util.List;
/**
* Performs the stats operation.
*/
public class TransportSqlStatsAction extends TransportNodesAction<SqlStatsRequest, SqlStatsResponse,
SqlStatsRequest.NodeStatsRequest, SqlStatsResponse.NodeStatsResponse> {
// the plan executor holds the metrics
private final PlanExecutor planExecutor;
@Inject
public TransportSqlStatsAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, PlanExecutor planExecutor) {
super(SqlStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
SqlStatsRequest::new, SqlStatsRequest.NodeStatsRequest::new, ThreadPool.Names.MANAGEMENT,
SqlStatsResponse.NodeStatsResponse.class);
this.planExecutor = planExecutor;
}
@Override
protected SqlStatsResponse newResponse(SqlStatsRequest request, List<SqlStatsResponse.NodeStatsResponse> nodes,
List<FailedNodeException> failures) {
return new SqlStatsResponse(clusterService.getClusterName(), nodes, failures);
}
@Override
protected SqlStatsRequest.NodeStatsRequest newNodeRequest(String nodeId, SqlStatsRequest request) {
return new SqlStatsRequest.NodeStatsRequest(request, nodeId);
}
@Override
protected SqlStatsResponse.NodeStatsResponse newNodeResponse() {
return new SqlStatsResponse.NodeStatsResponse();
}
@Override
protected SqlStatsResponse.NodeStatsResponse nodeOperation(SqlStatsRequest.NodeStatsRequest request) {
SqlStatsResponse.NodeStatsResponse statsResponse = new SqlStatsResponse.NodeStatsResponse(clusterService.localNode());
statsResponse.setStats(planExecutor.metrics().stats());
return statsResponse;
}
}

View File

@ -38,6 +38,7 @@ public class TransportSqlTranslateAction extends HandledTransportAction<SqlTrans
protected void doExecute(Task task, SqlTranslateRequest request, ActionListener<SqlTranslateResponse> listener) {
sqlLicenseChecker.checkIfSqlAllowed(request.mode());
planExecutor.metrics().translate();
Configuration cfg = new Configuration(request.timeZone(), request.fetchSize(),
request.requestTimeout(), request.pageTimeout(), request.filter());

View File

@ -192,4 +192,4 @@ public abstract class RuleExecutor<TreeType extends Node<TreeType>> {
return new ExecutionInfo(plan, currentPlan, transformations);
}
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.analysis.analyzer.PreAnalyzer;
import org.elasticsearch.xpack.sql.analysis.analyzer.PreAnalyzer.PreAnalysis;
import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolution;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolver;
import org.elasticsearch.xpack.sql.analysis.index.MappingException;
@ -36,6 +37,7 @@ public class SqlSession {
private final FunctionRegistry functionRegistry;
private final IndexResolver indexResolver;
private final PreAnalyzer preAnalyzer;
private final Verifier verifier;
private final Optimizer optimizer;
private final Planner planner;
@ -44,12 +46,13 @@ public class SqlSession {
public SqlSession(SqlSession other) {
this(other.settings, other.client, other.functionRegistry, other.indexResolver,
other.preAnalyzer, other.optimizer,other.planner);
other.preAnalyzer, other.verifier, other.optimizer, other.planner);
}
public SqlSession(Configuration settings, Client client, FunctionRegistry functionRegistry,
IndexResolver indexResolver,
PreAnalyzer preAnalyzer,
Verifier verifier,
Optimizer optimizer,
Planner planner) {
this.client = client;
@ -59,6 +62,7 @@ public class SqlSession {
this.preAnalyzer = preAnalyzer;
this.optimizer = optimizer;
this.planner = planner;
this.verifier = verifier;
this.settings = settings;
}
@ -82,6 +86,10 @@ public class SqlSession {
public Optimizer optimizer() {
return optimizer;
}
public Verifier verifier() {
return verifier;
}
private LogicalPlan doParse(String sql, List<SqlTypedParamValue> params) {
return new SqlParser().createStatement(sql, params);
@ -94,9 +102,8 @@ public class SqlSession {
}
preAnalyze(parsed, c -> {
Analyzer analyzer = new Analyzer(functionRegistry, c, settings.timeZone());
LogicalPlan p = analyzer.analyze(parsed);
return verify ? analyzer.verify(p) : p;
Analyzer analyzer = new Analyzer(functionRegistry, c, settings.timeZone(), verifier);
return analyzer.analyze(parsed, verify);
}, listener);
}
@ -107,7 +114,7 @@ public class SqlSession {
}
preAnalyze(parsed, r -> {
Analyzer analyzer = new Analyzer(functionRegistry, r, settings.timeZone());
Analyzer analyzer = new Analyzer(functionRegistry, r, settings.timeZone(), verifier);
return analyzer.debugAnalyze(parsed);
}, listener);
}

View File

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.stats;
import java.util.Locale;
public enum FeatureMetric {
COMMAND,
GROUPBY,
HAVING,
JOIN,
LIMIT,
LOCAL,
ORDERBY,
SUBSELECT,
WHERE;
@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.stats;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
/**
* Class encapsulating the metrics collected for ES SQL
*/
public class Metrics {
private enum OperationType {
FAILED, PAGING, TOTAL;
@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
}
}
// map that holds total/paging/failed counters for each client type (rest, cli, jdbc, odbc...)
private final Map<QueryMetric, Map<OperationType, CounterMetric>> opsByTypeMetrics;
// map that holds one counter per sql query "feature" (having, limit, order by, group by...)
private final Map<FeatureMetric, CounterMetric> featuresMetrics;
// counter for "translate" requests
private final CounterMetric translateMetric;
protected static String QPREFIX = "queries.";
protected static String FPREFIX = "features.";
protected static String TRANSLATE_METRIC = "queries.translate.count";
public Metrics() {
Map<QueryMetric, Map<OperationType, CounterMetric>> qMap = new LinkedHashMap<>();
for (QueryMetric metric : QueryMetric.values()) {
Map<OperationType, CounterMetric> metricsMap = new LinkedHashMap<>(OperationType.values().length);
for (OperationType type : OperationType.values()) {
metricsMap.put(type, new CounterMetric());
}
qMap.put(metric, Collections.unmodifiableMap(metricsMap));
}
opsByTypeMetrics = Collections.unmodifiableMap(qMap);
Map<FeatureMetric, CounterMetric> fMap = new LinkedHashMap<>(FeatureMetric.values().length);
for (FeatureMetric featureMetric : FeatureMetric.values()) {
fMap.put(featureMetric, new CounterMetric());
}
featuresMetrics = Collections.unmodifiableMap(fMap);
translateMetric = new CounterMetric();
}
/**
* Increments the "total" counter for a metric
* This method should be called only once per query.
*/
public void total(QueryMetric metric) {
inc(metric, OperationType.TOTAL);
}
/**
* Increments the "failed" counter for a metric
*/
public void failed(QueryMetric metric) {
inc(metric, OperationType.FAILED);
}
/**
* Increments the "paging" counter for a metric
*/
public void paging(QueryMetric metric) {
inc(metric, OperationType.PAGING);
}
/**
* Increments the "translate" metric
*/
public void translate() {
translateMetric.inc();
}
private void inc(QueryMetric metric, OperationType op) {
this.opsByTypeMetrics.get(metric).get(op).inc();
}
public void inc(FeatureMetric metric) {
this.featuresMetrics.get(metric).inc();
}
public Counters stats() {
Counters counters = new Counters();
// queries metrics
for (Entry<QueryMetric, Map<OperationType, CounterMetric>> entry : opsByTypeMetrics.entrySet()) {
for (OperationType type : OperationType.values()) {
counters.inc(QPREFIX + entry.getKey().toString() + "." + type.toString(), entry.getValue().get(type).count());
counters.inc(QPREFIX + "_all." + type.toString(), entry.getValue().get(type).count());
}
}
// features metrics
for (Entry<FeatureMetric, CounterMetric> entry : featuresMetrics.entrySet()) {
counters.inc(FPREFIX + entry.getKey().toString(), entry.getValue().count());
}
// translate operation metric
counters.inc(TRANSLATE_METRIC, translateMetric.count());
return counters;
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.stats;
import org.elasticsearch.xpack.sql.proto.Mode;
import java.util.Locale;
public enum QueryMetric {
CANVAS, CLI, JDBC, ODBC, REST;
public static QueryMetric fromString(String metric) {
try {
return QueryMetric.valueOf(metric.toUpperCase(Locale.ROOT));
} catch (Exception e) {
return REST;
}
}
@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
}
public static QueryMetric from(Mode mode, String clientId) {
return fromString(mode == Mode.PLAIN ? clientId : mode.toString());
}
}

View File

@ -0,0 +1,114 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.sql.SqlFeatureSetUsage;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.sql.plugin.SqlStatsAction;
import org.elasticsearch.xpack.sql.plugin.SqlStatsResponse;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class SqlFeatureSetTests extends ESTestCase {
private XPackLicenseState licenseState;
private Client client;
@Before
public void init() throws Exception {
licenseState = mock(XPackLicenseState.class);
client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
when(threadPool.getThreadContext()).thenReturn(threadContext);
when(client.threadPool()).thenReturn(threadPool);
}
public void testAvailable() {
SqlFeatureSet featureSet = new SqlFeatureSet(Settings.EMPTY, licenseState, client);
boolean available = randomBoolean();
when(licenseState.isSqlAllowed()).thenReturn(available);
assertThat(featureSet.available(), is(available));
}
public void testEnabled() {
boolean enabled = randomBoolean();
Settings.Builder settings = Settings.builder();
if (enabled) {
if (randomBoolean()) {
settings.put("xpack.sql.enabled", enabled);
}
} else {
settings.put("xpack.sql.enabled", enabled);
}
SqlFeatureSet featureSet = new SqlFeatureSet(settings.build(), licenseState, client);
assertThat(featureSet.enabled(), is(enabled));
}
@SuppressWarnings("unchecked")
public void testUsageStats() throws Exception {
doAnswer(mock -> {
ActionListener<SqlStatsResponse> listener =
(ActionListener<SqlStatsResponse>) mock.getArguments()[2];
List<SqlStatsResponse.NodeStatsResponse> nodes = new ArrayList<>();
DiscoveryNode first = new DiscoveryNode("first", buildNewFakeTransportAddress(), Version.CURRENT);
SqlStatsResponse.NodeStatsResponse firstNode = new SqlStatsResponse.NodeStatsResponse(first);
Counters firstCounters = new Counters();
firstCounters.inc("foo.foo", 1);
firstCounters.inc("foo.bar.baz", 1);
firstNode.setStats(firstCounters);
nodes.add(firstNode);
DiscoveryNode second = new DiscoveryNode("second", buildNewFakeTransportAddress(), Version.CURRENT);
SqlStatsResponse.NodeStatsResponse secondNode = new SqlStatsResponse.NodeStatsResponse(second);
Counters secondCounters = new Counters();
secondCounters.inc("spam", 1);
secondCounters.inc("foo.bar.baz", 4);
secondNode.setStats(secondCounters);
nodes.add(secondNode);
listener.onResponse(new SqlStatsResponse(new ClusterName("whatever"), nodes, Collections.emptyList()));
return null;
}).when(client).execute(eq(SqlStatsAction.INSTANCE), any(), any());
PlainActionFuture<SqlFeatureSet.Usage> future = new PlainActionFuture<>();
new SqlFeatureSet(Settings.EMPTY, licenseState, client).usage(future);
SqlFeatureSetUsage sqlUsage = (SqlFeatureSetUsage) future.get();
long fooBarBaz = ObjectPath.eval("foo.bar.baz", sqlUsage.stats());
long fooFoo = ObjectPath.eval("foo.foo", sqlUsage.stats());
long spam = ObjectPath.eval("spam", sqlUsage.stats());
assertThat(sqlUsage.stats().keySet(), containsInAnyOrder("foo", "spam"));
assertThat(fooBarBaz, is(5L));
assertThat(fooFoo, is(1L));
assertThat(spam, is(1L));
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.parser.SqlParser;
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.sql.plan.logical.Project;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.EsField;
import org.elasticsearch.xpack.sql.type.TypesTests;
@ -40,16 +41,18 @@ public class FieldAttributeTests extends ESTestCase {
private IndexResolution getIndexResult;
private FunctionRegistry functionRegistry;
private Analyzer analyzer;
private Verifier verifier;
public FieldAttributeTests() {
parser = new SqlParser();
functionRegistry = new FunctionRegistry();
verifier = new Verifier(new Metrics());
Map<String, EsField> mapping = TypesTests.loadMapping("mapping-multi-field-variation.json");
EsIndex test = new EsIndex("test", mapping);
getIndexResult = IndexResolution.valid(test);
analyzer = new Analyzer(functionRegistry, getIndexResult, TimeZone.getTimeZone("UTC"));
analyzer = new Analyzer(functionRegistry, getIndexResult, TimeZone.getTimeZone("UTC"), verifier);
}
private LogicalPlan plan(String sql) {
@ -166,7 +169,7 @@ public class FieldAttributeTests extends ESTestCase {
EsIndex index = new EsIndex("test", mapping);
getIndexResult = IndexResolution.valid(index);
analyzer = new Analyzer(functionRegistry, getIndexResult, TimeZone.getTimeZone("UTC"));
analyzer = new Analyzer(functionRegistry, getIndexResult, TimeZone.getTimeZone("UTC"), verifier);
VerificationException ex = expectThrows(VerificationException.class, () -> plan("SELECT test.bar FROM test"));
assertEquals(

View File

@ -13,6 +13,7 @@ import org.elasticsearch.xpack.sql.analysis.index.IndexResolverTests;
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.parser.SqlParser;
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.type.EsField;
import org.elasticsearch.xpack.sql.type.TypesTests;
@ -29,7 +30,7 @@ public class VerifierErrorMessagesTests extends ESTestCase {
}
private String error(IndexResolution getIndexResult, String sql) {
Analyzer analyzer = new Analyzer(new FunctionRegistry(), getIndexResult, TimeZone.getTimeZone("UTC"));
Analyzer analyzer = new Analyzer(new FunctionRegistry(), getIndexResult, TimeZone.getTimeZone("UTC"), new Verifier(new Metrics()));
AnalysisException e = expectThrows(AnalysisException.class, () -> analyzer.analyze(parser.createStatement(sql), true));
assertTrue(e.getMessage().startsWith("Found "));
String header = "Found 1 problem(s)\nline ";
@ -43,7 +44,7 @@ public class VerifierErrorMessagesTests extends ESTestCase {
}
private LogicalPlan accept(IndexResolution resolution, String sql) {
Analyzer analyzer = new Analyzer(new FunctionRegistry(), resolution, TimeZone.getTimeZone("UTC"));
Analyzer analyzer = new Analyzer(new FunctionRegistry(), resolution, TimeZone.getTimeZone("UTC"), new Verifier(new Metrics()));
return analyzer.analyze(parser.createStatement(sql), true);
}

View File

@ -7,11 +7,13 @@ package org.elasticsearch.xpack.sql.optimizer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
import org.elasticsearch.xpack.sql.analysis.index.EsIndex;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolution;
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.parser.SqlParser;
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.type.EsField;
import org.elasticsearch.xpack.sql.type.TypesTests;
@ -34,7 +36,7 @@ public class OptimizerRunTests extends ESTestCase {
EsIndex test = new EsIndex("test", mapping);
getIndexResult = IndexResolution.valid(test);
analyzer = new Analyzer(functionRegistry, getIndexResult, TimeZone.getTimeZone("UTC"));
analyzer = new Analyzer(functionRegistry, getIndexResult, TimeZone.getTimeZone("UTC"), new Verifier(new Metrics()));
optimizer = new Optimizer();
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
import org.elasticsearch.xpack.sql.analysis.index.EsIndex;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolution;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolver;
@ -16,6 +17,7 @@ import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.parser.SqlParser;
import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.type.TypesTests;
import java.util.TimeZone;
@ -33,7 +35,8 @@ public class SysCatalogsTests extends ESTestCase {
@SuppressWarnings({ "rawtypes", "unchecked" })
private Tuple<Command, SqlSession> sql(String sql) {
EsIndex test = new EsIndex("test", TypesTests.loadMapping("mapping-multi-field-with-nested.json", true));
Analyzer analyzer = new Analyzer(new FunctionRegistry(), IndexResolution.valid(test), TimeZone.getTimeZone("UTC"));
Analyzer analyzer = new Analyzer(new FunctionRegistry(), IndexResolution.valid(test), TimeZone.getTimeZone("UTC"),
new Verifier(new Metrics()));
Command cmd = (Command) analyzer.analyze(parser.createStatement(sql), true);
IndexResolver resolver = mock(IndexResolver.class);
@ -44,7 +47,7 @@ public class SysCatalogsTests extends ESTestCase {
return Void.TYPE;
}).when(resolver).resolveAsSeparateMappings(any(), any(), any());
SqlSession session = new SqlSession(null, null, null, resolver, null, null, null);
SqlSession session = new SqlSession(null, null, null, resolver, null, null, null, null);
return new Tuple<>(cmd, session);
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
import org.elasticsearch.xpack.sql.analysis.index.EsIndex;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolution;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolver;
@ -16,6 +17,7 @@ import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.parser.SqlParser;
import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.EsField;
import org.elasticsearch.xpack.sql.type.TypesTests;
@ -39,7 +41,8 @@ public class SysParserTests extends ESTestCase {
@SuppressWarnings({ "rawtypes", "unchecked" })
private Tuple<Command, SqlSession> sql(String sql) {
EsIndex test = new EsIndex("test", mapping);
Analyzer analyzer = new Analyzer(new FunctionRegistry(), IndexResolution.valid(test), TimeZone.getTimeZone("UTC"));
Analyzer analyzer = new Analyzer(new FunctionRegistry(), IndexResolution.valid(test), TimeZone.getTimeZone("UTC"),
new Verifier(new Metrics()));
Command cmd = (Command) analyzer.analyze(parser.createStatement(sql), true);
IndexResolver resolver = mock(IndexResolver.class);
@ -50,7 +53,7 @@ public class SysParserTests extends ESTestCase {
return Void.TYPE;
}).when(resolver).resolveAsSeparateMappings(any(), any(), any());
SqlSession session = new SqlSession(null, null, null, resolver, null, null, null);
SqlSession session = new SqlSession(null, null, null, resolver, null, null, null, null);
return new Tuple<>(cmd, session);
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
import org.elasticsearch.xpack.sql.analysis.index.EsIndex;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolution;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolver;
@ -16,6 +17,7 @@ import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.parser.SqlParser;
import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.type.TypesTests;
import java.util.TimeZone;
@ -28,11 +30,12 @@ public class SysTableTypesTests extends ESTestCase {
private Tuple<Command, SqlSession> sql(String sql) {
EsIndex test = new EsIndex("test", TypesTests.loadMapping("mapping-multi-field-with-nested.json", true));
Analyzer analyzer = new Analyzer(new FunctionRegistry(), IndexResolution.valid(test), TimeZone.getTimeZone("UTC"));
Analyzer analyzer = new Analyzer(new FunctionRegistry(), IndexResolution.valid(test), TimeZone.getTimeZone("UTC"),
new Verifier(new Metrics()));
Command cmd = (Command) analyzer.analyze(parser.createStatement(sql), true);
IndexResolver resolver = mock(IndexResolver.class);
SqlSession session = new SqlSession(null, null, null, resolver, null, null, null);
SqlSession session = new SqlSession(null, null, null, resolver, null, null, null, null);
return new Tuple<>(cmd, session);
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
import org.elasticsearch.xpack.sql.analysis.index.EsIndex;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolution;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolver;
@ -20,6 +21,7 @@ import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.type.DataTypes;
import org.elasticsearch.xpack.sql.type.EsField;
import org.elasticsearch.xpack.sql.type.TypesTests;
@ -234,13 +236,14 @@ public class SysTablesTests extends ESTestCase {
private Tuple<Command, SqlSession> sql(String sql, List<SqlTypedParamValue> params) {
EsIndex test = new EsIndex("test", mapping);
Analyzer analyzer = new Analyzer(new FunctionRegistry(), IndexResolution.valid(test), TimeZone.getTimeZone("UTC"));
Analyzer analyzer = new Analyzer(new FunctionRegistry(), IndexResolution.valid(test), TimeZone.getTimeZone("UTC"),
new Verifier(new Metrics()));
Command cmd = (Command) analyzer.analyze(parser.createStatement(sql, params), true);
IndexResolver resolver = mock(IndexResolver.class);
when(resolver.clusterName()).thenReturn(CLUSTER_NAME);
SqlSession session = new SqlSession(null, null, null, resolver, null, null, null);
SqlSession session = new SqlSession(null, null, null, resolver, null, null, null, null);
return new Tuple<>(cmd, session);
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.sql.planner;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
import org.elasticsearch.xpack.sql.analysis.index.EsIndex;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolution;
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
@ -16,6 +17,7 @@ import org.elasticsearch.xpack.sql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.sql.plan.physical.LocalExec;
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.sql.session.EmptyExecutable;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.type.EsField;
import org.elasticsearch.xpack.sql.type.TypesTests;
import org.junit.AfterClass;
@ -40,7 +42,7 @@ public class QueryFolderTests extends ESTestCase {
Map<String, EsField> mapping = TypesTests.loadMapping("mapping-multi-field-variation.json");
EsIndex test = new EsIndex("test", mapping);
IndexResolution getIndexResult = IndexResolution.valid(test);
analyzer = new Analyzer(new FunctionRegistry(), getIndexResult, TimeZone.getTimeZone("UTC"));
analyzer = new Analyzer(new FunctionRegistry(), getIndexResult, TimeZone.getTimeZone("UTC"), new Verifier(new Metrics()));
optimizer = new Optimizer();
planner = new Planner();
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.sql.planner;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
import org.elasticsearch.xpack.sql.analysis.index.EsIndex;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolution;
import org.elasticsearch.xpack.sql.analysis.index.MappingException;
@ -26,6 +27,7 @@ import org.elasticsearch.xpack.sql.querydsl.query.RangeQuery;
import org.elasticsearch.xpack.sql.querydsl.query.ScriptQuery;
import org.elasticsearch.xpack.sql.querydsl.query.TermQuery;
import org.elasticsearch.xpack.sql.querydsl.query.TermsQuery;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.type.EsField;
import org.elasticsearch.xpack.sql.type.TypesTests;
import org.elasticsearch.xpack.sql.util.DateUtils;
@ -50,7 +52,7 @@ public class QueryTranslatorTests extends ESTestCase {
Map<String, EsField> mapping = TypesTests.loadMapping("mapping-multi-field-variation.json");
EsIndex test = new EsIndex("test", mapping);
IndexResolution getIndexResult = IndexResolution.valid(test);
analyzer = new Analyzer(new FunctionRegistry(), getIndexResult, TimeZone.getTimeZone("UTC"));
analyzer = new Analyzer(new FunctionRegistry(), getIndexResult, TimeZone.getTimeZone("UTC"), new Verifier(new Metrics()));
}
@AfterClass

View File

@ -0,0 +1,249 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.stats;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
import org.elasticsearch.xpack.sql.analysis.index.EsIndex;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolution;
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.parser.SqlParser;
import org.elasticsearch.xpack.sql.type.EsField;
import org.elasticsearch.xpack.sql.type.TypesTests;
import java.util.Map;
import java.util.TimeZone;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.COMMAND;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.GROUPBY;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.HAVING;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.LIMIT;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.LOCAL;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.ORDERBY;
import static org.elasticsearch.xpack.sql.stats.FeatureMetric.WHERE;
import static org.elasticsearch.xpack.sql.stats.Metrics.FPREFIX;
public class VerifierMetricsTests extends ESTestCase {
private SqlParser parser = new SqlParser();
private String[] commands = {"SHOW FUNCTIONS", "SHOW COLUMNS FROM library", "SHOW SCHEMAS",
"SHOW TABLES", "SYS CATALOGS", "SYS COLUMNS LIKE '%name'",
"SYS TABLES", "SYS TYPES"};
public void testWhereQuery() {
Counters c = sql("SELECT emp_no FROM test WHERE languages > 2");
assertEquals(1L, where(c));
assertEquals(0, limit(c));
assertEquals(0, groupby(c));
assertEquals(0, having(c));
assertEquals(0, orderby(c));
assertEquals(0, command(c));
assertEquals(0, local(c));
}
public void testLimitQuery() {
Counters c = sql("SELECT emp_no FROM test LIMIT 4");
assertEquals(0, where(c));
assertEquals(1L, limit(c));
assertEquals(0, groupby(c));
assertEquals(0, having(c));
assertEquals(0, orderby(c));
assertEquals(0, command(c));
assertEquals(0, local(c));
}
public void testGroupByQuery() {
Counters c = sql("SELECT languages, MAX(languages) FROM test GROUP BY languages");
assertEquals(0, where(c));
assertEquals(0, limit(c));
assertEquals(1L, groupby(c));
assertEquals(0, having(c));
assertEquals(0, orderby(c));
assertEquals(0, command(c));
assertEquals(0, local(c));
}
public void testHavingQuery() {
Counters c = sql("SELECT UCASE(gender), MAX(languages) FROM test GROUP BY gender HAVING MAX(languages) > 3");
assertEquals(0, where(c));
assertEquals(0, limit(c));
assertEquals(1L, groupby(c));
assertEquals(1L, having(c));
assertEquals(0, orderby(c));
assertEquals(0, command(c));
assertEquals(0, local(c));
}
public void testOrderByQuery() {
Counters c = sql("SELECT UCASE(gender) FROM test ORDER BY emp_no");
assertEquals(0, where(c));
assertEquals(0, limit(c));
assertEquals(0, groupby(c));
assertEquals(0, having(c));
assertEquals(1L, orderby(c));
assertEquals(0, command(c));
assertEquals(0, local(c));
}
public void testCommand() {
Counters c = sql(randomFrom("SHOW FUNCTIONS", "SHOW COLUMNS FROM library", "SHOW SCHEMAS",
"SHOW TABLES", "SYS CATALOGS", "SYS COLUMNS LIKE '%name'",
"SYS TABLES", "SYS TYPES"));
assertEquals(0, where(c));
assertEquals(0, limit(c));
assertEquals(0, groupby(c));
assertEquals(0, having(c));
assertEquals(0, orderby(c));
assertEquals(1L, command(c));
assertEquals(0, local(c));
}
public void testLocalQuery() {
Counters c = sql("SELECT CONCAT('Elastic','search')");
assertEquals(0, where(c));
assertEquals(0, limit(c));
assertEquals(0, groupby(c));
assertEquals(0, having(c));
assertEquals(0, orderby(c));
assertEquals(0, command(c));
assertEquals(1L, local(c));
}
public void testWhereAndLimitQuery() {
Counters c = sql("SELECT emp_no FROM test WHERE languages > 2 LIMIT 5");
assertEquals(1L, where(c));
assertEquals(1L, limit(c));
assertEquals(0, groupby(c));
assertEquals(0, having(c));
assertEquals(0, orderby(c));
assertEquals(0, command(c));
assertEquals(0, local(c));
}
public void testWhereLimitGroupByQuery() {
Counters c = sql("SELECT languages FROM test WHERE languages > 2 GROUP BY languages LIMIT 5");
assertEquals(1L, where(c));
assertEquals(1L, limit(c));
assertEquals(1L, groupby(c));
assertEquals(0, having(c));
assertEquals(0, orderby(c));
assertEquals(0, command(c));
assertEquals(0, local(c));
}
public void testWhereLimitGroupByHavingQuery() {
Counters c = sql("SELECT languages FROM test WHERE languages > 2 GROUP BY languages HAVING MAX(languages) > 3 LIMIT 5");
assertEquals(1L, where(c));
assertEquals(1L, limit(c));
assertEquals(1L, groupby(c));
assertEquals(1L, having(c));
assertEquals(0, orderby(c));
assertEquals(0, command(c));
assertEquals(0, local(c));
}
public void testWhereLimitGroupByHavingOrderByQuery() {
Counters c = sql("SELECT languages FROM test WHERE languages > 2 GROUP BY languages HAVING MAX(languages) > 3"
+ " ORDER BY languages LIMIT 5");
assertEquals(1L, where(c));
assertEquals(1L, limit(c));
assertEquals(1L, groupby(c));
assertEquals(1L, having(c));
assertEquals(1L, orderby(c));
assertEquals(0, command(c));
assertEquals(0, local(c));
}
public void testTwoQueriesExecuted() {
Metrics metrics = new Metrics();
Verifier verifier = new Verifier(metrics);
sqlWithVerifier("SELECT languages FROM test WHERE languages > 2 GROUP BY languages LIMIT 5", verifier);
sqlWithVerifier("SELECT languages FROM test WHERE languages > 2 GROUP BY languages HAVING MAX(languages) > 3 "
+ "ORDER BY languages LIMIT 5", verifier);
Counters c = metrics.stats();
assertEquals(2L, where(c));
assertEquals(2L, limit(c));
assertEquals(2L, groupby(c));
assertEquals(1L, having(c));
assertEquals(1L, orderby(c));
assertEquals(0, command(c));
assertEquals(0, local(c));
}
public void testTwoCommandsExecuted() {
String command1 = randomFrom(commands);
Metrics metrics = new Metrics();
Verifier verifier = new Verifier(metrics);
sqlWithVerifier(command1, verifier);
sqlWithVerifier(randomValueOtherThan(command1, () -> randomFrom(commands)), verifier);
Counters c = metrics.stats();
assertEquals(0, where(c));
assertEquals(0, limit(c));
assertEquals(0, groupby(c));
assertEquals(0, having(c));
assertEquals(0, orderby(c));
assertEquals(2, command(c));
assertEquals(0, local(c));
}
private long where(Counters c) {
return c.get(FPREFIX + WHERE);
}
private long groupby(Counters c) {
return c.get(FPREFIX + GROUPBY);
}
private long limit(Counters c) {
return c.get(FPREFIX + LIMIT);
}
private long local(Counters c) {
return c.get(FPREFIX + LOCAL);
}
private long having(Counters c) {
return c.get(FPREFIX + HAVING);
}
private long orderby(Counters c) {
return c.get(FPREFIX + ORDERBY);
}
private long command(Counters c) {
return c.get(FPREFIX + COMMAND);
}
private Counters sql(String sql) {
return sql(sql, null);
}
private void sqlWithVerifier(String sql, Verifier verifier) {
sql(sql, verifier);
}
private Counters sql(String sql, Verifier v) {
Map<String, EsField> mapping = TypesTests.loadMapping("mapping-basic.json");
EsIndex test = new EsIndex("test", mapping);
Verifier verifier = v;
Metrics metrics = null;
if (v == null) {
metrics = new Metrics();
verifier = new Verifier(metrics);
}
Analyzer analyzer = new Analyzer(new FunctionRegistry(), IndexResolution.valid(test), TimeZone.getTimeZone("UTC"), verifier);
analyzer.analyze(parser.createStatement(sql), true);
return metrics == null ? null : metrics.stats();
}
}