mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
SQL: Introduce PreAnalyze phase to resolve catalogs async (elastic/x-pack-elasticsearch#2962)
SQL: Introduce PreAnalyze phase to resolve catalogs async The new preanalyze phase collects all unresolved relations and tries to resolve them as indices through typical async calls _before_ starting the analysis process. The result is loaded into a catalog which is then passed to the analyzer. While at it, the analyzer was made singleton and state across the engine is done through SqlSession#currentContext(). Commit missing fix Fix typo Fix license Fix line length remove redundant static modifier Remove redundant generics type Rename catalogResolver instance member to indexResolver Fix translate action to return a response through the listener, it hangs otherwise IndexResolver improvements Make sure that get index requests calls are locally executed by providing local flag. Don't replace index/alias name with concrete index name in asCatalog response conversion. We need to preserve the original alias name for security, so it is reused in the subsequent search. Update roles and actions names for security tests Get index is now executed instead of sql get indices, and sql get indices has been removed. Also made cluster privileges more restrictive to make sure that cluster state calls are no longer executed. Fix most of the security IT tests indices options are now unified, always lenient. The only situation where we get authorization exception back is when the user is not authorized for the sql action (besides for which indices). Improve SessionContext handling Fix context being invalid in non-executable phases Make Explain & Debug command fully async Resolve checkstyle error about redundant modifiers Temporarily restore SqlGetIndicesAction SqlGetIndicesAction action is still needed in RestSqlJdbcAction (metaTable and metaColumn methods), where we can't at the moment call IndexResolver directly, as security (FLS) needs index resolver to be called as part of the execution of an indices action. Once mappings are returned filtered, delayed action and the security filter will go away, as well as SqlGetIndicesAction. SqlGetIndicesAction doesn't need to be a delayed action, my bad [TEST] remove unused expectSqlWithAsyncLookup and rename expectSqlWithSyncLookup to expectSqlCompositeAction Polish and feedback Add unit test for PreAnalyzer Original commit: elastic/x-pack-elasticsearch@57846ed613
This commit is contained in:
parent
6fceb2fdde
commit
7cab29760d
@ -84,8 +84,8 @@ import org.elasticsearch.xpack.rest.action.RestXPackUsageAction;
|
||||
import org.elasticsearch.xpack.security.Security;
|
||||
import org.elasticsearch.xpack.security.authc.AuthenticationService;
|
||||
import org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.FilteredCatalog;
|
||||
import org.elasticsearch.xpack.sql.SecurityCatalogFilter;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.FilteredCatalog;
|
||||
import org.elasticsearch.xpack.sql.plugin.SqlLicenseChecker;
|
||||
import org.elasticsearch.xpack.sql.plugin.SqlPlugin;
|
||||
import org.elasticsearch.xpack.ssl.SSLConfigurationReloader;
|
||||
@ -93,6 +93,7 @@ import org.elasticsearch.xpack.ssl.SSLService;
|
||||
import org.elasticsearch.xpack.upgrade.Upgrade;
|
||||
import org.elasticsearch.xpack.watcher.Watcher;
|
||||
|
||||
import javax.security.auth.DestroyFailedException;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.security.AccessController;
|
||||
@ -112,10 +113,6 @@ import java.util.function.UnaryOperator;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.security.auth.DestroyFailedException;
|
||||
|
||||
import static org.elasticsearch.xpack.watcher.Watcher.ENCRYPT_SENSITIVE_DATA_SETTING;
|
||||
|
||||
public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, IngestPlugin, NetworkPlugin, ClusterPlugin, DiscoveryPlugin {
|
||||
|
||||
public static final String NAME = "x-pack";
|
||||
@ -304,7 +301,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
|
||||
/* Note that we need *client*, not *internalClient* because client preserves the
|
||||
* authenticated user while internalClient throws that user away and acts as the
|
||||
* x-pack user. */
|
||||
components.addAll(sql.createComponents(client, clusterService, securityCatalogFilter));
|
||||
components.addAll(sql.createComponents(client, securityCatalogFilter));
|
||||
|
||||
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors);
|
||||
PersistentTasksClusterService persistentTasksClusterService = new PersistentTasksClusterService(settings, registry, clusterService);
|
||||
|
@ -66,8 +66,8 @@ import org.elasticsearch.xpack.security.user.SystemUser;
|
||||
import org.elasticsearch.xpack.security.user.User;
|
||||
import org.elasticsearch.xpack.security.user.XPackSecurityUser;
|
||||
import org.elasticsearch.xpack.security.user.XPackUser;
|
||||
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
|
||||
import org.elasticsearch.xpack.sql.plugin.SqlTranslateAction;
|
||||
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -75,7 +75,7 @@ public class SecurityCatalogFilter implements FilteredCatalog.Filter {
|
||||
filteredMapping.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
return GetIndexResult.valid(new EsIndex(index.name(), filteredMapping, index.aliases(), index.settings()));
|
||||
return GetIndexResult.valid(new EsIndex(index.name(), filteredMapping));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -20,7 +20,7 @@ public class CliExplainIT extends CliIntegrationTestCase {
|
||||
assertThat(readLine(), startsWith("----------"));
|
||||
assertThat(readLine(), startsWith("With[{}]"));
|
||||
assertThat(readLine(), startsWith("\\_Project[[?*]]"));
|
||||
assertThat(readLine(), startsWith(" \\_UnresolvedRelation[[index=test],null]"));
|
||||
assertThat(readLine(), startsWith(" \\_UnresolvedRelation[[index=test],null,Unknown index [test]]"));
|
||||
assertEquals("[0m", readLine());
|
||||
|
||||
assertThat(command("EXPLAIN " + (randomBoolean() ? "" : "(PLAN ANALYZED) ") + "SELECT * FROM test"), containsString("plan"));
|
||||
@ -59,7 +59,7 @@ public class CliExplainIT extends CliIntegrationTestCase {
|
||||
assertThat(readLine(), startsWith("With[{}]"));
|
||||
assertThat(readLine(), startsWith("\\_Project[[?*]]"));
|
||||
assertThat(readLine(), startsWith(" \\_Filter[?i = 2]"));
|
||||
assertThat(readLine(), startsWith(" \\_UnresolvedRelation[[index=test],null]"));
|
||||
assertThat(readLine(), startsWith(" \\_UnresolvedRelation[[index=test],null,Unknown index [test]]"));
|
||||
assertEquals("[0m", readLine());
|
||||
|
||||
assertThat(command("EXPLAIN " + (randomBoolean() ? "" : "(PLAN ANALYZED) ") + "SELECT * FROM test WHERE i = 2"),
|
||||
@ -110,7 +110,7 @@ public class CliExplainIT extends CliIntegrationTestCase {
|
||||
assertThat(readLine(), startsWith("----------"));
|
||||
assertThat(readLine(), startsWith("With[{}]"));
|
||||
assertThat(readLine(), startsWith("\\_Project[[?COUNT(?*)]]"));
|
||||
assertThat(readLine(), startsWith(" \\_UnresolvedRelation[[index=test],null]"));
|
||||
assertThat(readLine(), startsWith(" \\_UnresolvedRelation[[index=test],null,Unknown index [test]]"));
|
||||
assertEquals("[0m", readLine());
|
||||
|
||||
assertThat(command("EXPLAIN " + (randomBoolean() ? "" : "(PLAN ANALYZED) ") + "SELECT COUNT(*) FROM test"),
|
||||
|
@ -1,44 +1,44 @@
|
||||
read_all:
|
||||
cluster:
|
||||
- monitor # Used by JDBC's MetaData
|
||||
- "cluster:monitor/main" # Used by JDBC's MetaData
|
||||
indices:
|
||||
- names: test
|
||||
privileges: [read]
|
||||
privileges: [read, "indices:admin/get"]
|
||||
- names: bort
|
||||
privileges: [read]
|
||||
privileges: [read, "indices:admin/get"]
|
||||
|
||||
read_something_else:
|
||||
cluster:
|
||||
- monitor # Used by JDBC's MetaData
|
||||
- "cluster:monitor/main" # Used by JDBC's MetaData
|
||||
indices:
|
||||
- names: something_that_isnt_test
|
||||
privileges: [read]
|
||||
privileges: [read, "indices:admin/get"]
|
||||
|
||||
read_test_a:
|
||||
cluster:
|
||||
- monitor # Used by JDBC's MetaData
|
||||
- "cluster:monitor/main" # Used by JDBC's MetaData
|
||||
indices:
|
||||
- names: test
|
||||
privileges: [read]
|
||||
privileges: [read, "indices:admin/get"]
|
||||
field_security:
|
||||
grant: [a]
|
||||
|
||||
read_test_a_and_b:
|
||||
cluster:
|
||||
- monitor # Used by JDBC's MetaData
|
||||
- "cluster:monitor/main" # Used by JDBC's MetaData
|
||||
indices:
|
||||
- names: test
|
||||
privileges: [read]
|
||||
privileges: [read, "indices:admin/get"]
|
||||
field_security:
|
||||
grant: ["*"]
|
||||
except: [c]
|
||||
|
||||
read_test_without_c_3:
|
||||
cluster:
|
||||
- monitor # Used by JDBC's MetaData
|
||||
- "cluster:monitor/main" # Used by JDBC's MetaData
|
||||
indices:
|
||||
- names: test
|
||||
privileges: [read]
|
||||
privileges: [read, "indices:admin/get"]
|
||||
query: |
|
||||
{
|
||||
"bool": {
|
||||
@ -54,7 +54,7 @@ read_test_without_c_3:
|
||||
|
||||
read_bort:
|
||||
cluster:
|
||||
- monitor # Used by JDBC's MetaData
|
||||
- "cluster:monitor/main" # Used by JDBC's MetaData
|
||||
indices:
|
||||
- names: bort
|
||||
privileges: [read]
|
||||
privileges: [read, "indices:admin/get"]
|
||||
|
@ -8,16 +8,16 @@ package org.elasticsearch.xpack.qa.sql.security;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.xpack.qa.sql.cli.RemoteCli;
|
||||
|
||||
import static org.elasticsearch.xpack.qa.sql.cli.CliIntegrationTestCase.elasticsearchAddress;
|
||||
import static org.hamcrest.Matchers.both;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.xpack.qa.sql.cli.CliIntegrationTestCase.elasticsearchAddress;
|
||||
import static org.hamcrest.Matchers.both;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
public class CliSecurityIT extends SqlSecurityTestCase {
|
||||
static String adminEsUrlPrefix() {
|
||||
return "test_admin:x-pack-test-password@";
|
||||
@ -107,6 +107,14 @@ public class CliSecurityIT extends SqlSecurityTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void expectUnknownIndex(String user, String sql) throws Exception {
|
||||
try (RemoteCli cli = new RemoteCli(userPrefix(user) + elasticsearchAddress())) {
|
||||
assertThat(cli.command(sql), containsString("Bad request"));
|
||||
assertThat(cli.readLine(), containsString("Unknown index"));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void expectForbidden(String user, String sql) throws Exception {
|
||||
try (RemoteCli cli = new RemoteCli(userPrefix(user) + elasticsearchAddress())) {
|
||||
|
@ -5,6 +5,7 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.qa.sql.security;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
import org.elasticsearch.xpack.qa.sql.jdbc.LocalH2;
|
||||
@ -58,12 +59,20 @@ public class JdbcSecurityIT extends SqlSecurityTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
static void expectActionForbidden(String user, CheckedConsumer<Connection, SQLException> action) throws Exception {
|
||||
static void expectForbidden(String user, CheckedConsumer<Connection, SQLException> action) throws Exception {
|
||||
expectError(user, action, "is unauthorized for user [" + user + "]");
|
||||
}
|
||||
|
||||
static void expectUnknownIndex(String user, CheckedConsumer<Connection, SQLException> action) throws Exception {
|
||||
expectError(user, action, "Unknown index");
|
||||
}
|
||||
|
||||
static void expectError(String user, CheckedConsumer<Connection, SQLException> action, String errorMessage) throws Exception {
|
||||
SQLException e;
|
||||
try (Connection connection = es(userProperties(user))) {
|
||||
e = expectThrows(SQLException.class, () -> action.accept(connection));
|
||||
}
|
||||
assertThat(e.getMessage(), containsString("is unauthorized for user [" + user + "]"));
|
||||
assertThat(e.getMessage(), containsString(errorMessage));
|
||||
}
|
||||
|
||||
static void expectActionThrowsUnknownColumn(String user,
|
||||
@ -118,18 +127,20 @@ public class JdbcSecurityIT extends SqlSecurityTestCase {
|
||||
Connection es = es(userProperties(user))) {
|
||||
// h2 doesn't have the same sort of DESCRIBE that we have so we emulate it
|
||||
h2.createStatement().executeUpdate("CREATE TABLE mock (column VARCHAR, type VARCHAR)");
|
||||
StringBuilder insert = new StringBuilder();
|
||||
insert.append("INSERT INTO mock (column, type) VALUES ");
|
||||
boolean first = true;
|
||||
for (Map.Entry<String, String> column : columns.entrySet()) {
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
insert.append(", ");
|
||||
if (columns.size() > 0) {
|
||||
StringBuilder insert = new StringBuilder();
|
||||
insert.append("INSERT INTO mock (column, type) VALUES ");
|
||||
boolean first = true;
|
||||
for (Map.Entry<String, String> column : columns.entrySet()) {
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
insert.append(", ");
|
||||
}
|
||||
insert.append("('").append(column.getKey()).append("', '").append(column.getValue()).append("')");
|
||||
}
|
||||
insert.append("('").append(column.getKey()).append("', '").append(column.getValue()).append("')");
|
||||
h2.createStatement().executeUpdate(insert.toString());
|
||||
}
|
||||
h2.createStatement().executeUpdate(insert.toString());
|
||||
|
||||
ResultSet expected = h2.createStatement().executeQuery("SELECT * FROM mock");
|
||||
assertResultSets(expected, es.createStatement().executeQuery("DESCRIBE test"));
|
||||
@ -162,7 +173,12 @@ public class JdbcSecurityIT extends SqlSecurityTestCase {
|
||||
|
||||
@Override
|
||||
public void expectForbidden(String user, String sql) throws Exception {
|
||||
expectActionForbidden(user, con -> con.createStatement().executeQuery(sql));
|
||||
JdbcSecurityIT.expectForbidden(user, con -> con.createStatement().executeQuery(sql));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void expectUnknownIndex(String user, String sql) throws Exception {
|
||||
JdbcSecurityIT.expectUnknownIndex(user, con -> con.createStatement().executeQuery(sql));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -187,18 +203,18 @@ public class JdbcSecurityIT extends SqlSecurityTestCase {
|
||||
"full_access",
|
||||
con -> con.getMetaData().getTables("%", "%", "%", null));
|
||||
new AuditLogAsserter()
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "test_admin", contains("bort", "test"))
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "full_access", contains("bort", "test"))
|
||||
.expect(true, GetIndexAction.NAME, "test_admin", contains("bort", "test"))
|
||||
.expect(true, GetIndexAction.NAME, "full_access", contains("bort", "test"))
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
public void testMetaDataGetTablesWithNoAccess() throws Exception {
|
||||
createUser("no_access", "read_nothing");
|
||||
|
||||
expectActionForbidden("no_access", con -> con.getMetaData().getTables("%", "%", "%", null));
|
||||
expectForbidden("no_access", con -> con.getMetaData().getTables("%", "%", "%", null));
|
||||
new AuditLogAsserter()
|
||||
// TODO figure out why this generates *no* logs
|
||||
// .expect(false, SQL_INDICES_ACTION_NAME, "no_access", contains("bort", "test"))
|
||||
// .expect(false, GetIndexAction.NAME, "no_access", contains("bort", "test"))
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -210,8 +226,8 @@ public class JdbcSecurityIT extends SqlSecurityTestCase {
|
||||
"read_bort",
|
||||
con -> con.getMetaData().getTables("%", "%", "%", null));
|
||||
new AuditLogAsserter()
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "test_admin", contains("bort"))
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "read_bort", contains("bort"))
|
||||
.expect(true, GetIndexAction.NAME, "test_admin", contains("bort"))
|
||||
.expect(true, GetIndexAction.NAME, "read_bort", contains("bort"))
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -223,8 +239,8 @@ public class JdbcSecurityIT extends SqlSecurityTestCase {
|
||||
"read_bort",
|
||||
con -> con.getMetaData().getTables("%", "%", "test", null));
|
||||
new AuditLogAsserter()
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "test_admin", contains("*", "-*"))
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "read_bort", contains("*", "-*"))
|
||||
.expect(true, GetIndexAction.NAME, "test_admin", contains("*", "-*"))
|
||||
.expect(true, GetIndexAction.NAME, "read_bort", contains("*", "-*"))
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -236,18 +252,18 @@ public class JdbcSecurityIT extends SqlSecurityTestCase {
|
||||
"full_access",
|
||||
con -> con.getMetaData().getColumns("%", "%", "%", "%"));
|
||||
new AuditLogAsserter()
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "test_admin", contains("bort", "test"))
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "full_access", contains("bort", "test"))
|
||||
.expect(true, GetIndexAction.NAME, "test_admin", contains("bort", "test"))
|
||||
.expect(true, GetIndexAction.NAME, "full_access", contains("bort", "test"))
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
public void testMetaDataGetColumnsWithNoAccess() throws Exception {
|
||||
createUser("no_access", "read_nothing");
|
||||
|
||||
expectActionForbidden("no_access", con -> con.getMetaData().getColumns("%", "%", "%", "%"));
|
||||
expectForbidden("no_access", con -> con.getMetaData().getColumns("%", "%", "%", "%"));
|
||||
new AuditLogAsserter()
|
||||
// TODO figure out why this generates *no* logs
|
||||
// .expect(false, SQL_INDICES_ACTION_NAME, "no_access", contains("bort", "test"))
|
||||
// .expect(false, GetIndexAction.NAME, "no_access", contains("bort", "test"))
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -259,8 +275,8 @@ public class JdbcSecurityIT extends SqlSecurityTestCase {
|
||||
"wrong_access",
|
||||
con -> con.getMetaData().getColumns("%", "%", "test", "%"));
|
||||
new AuditLogAsserter()
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "test_admin", contains("*", "-*"))
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "wrong_access", contains("*", "-*"))
|
||||
.expect(true, GetIndexAction.NAME, "test_admin", contains("*", "-*"))
|
||||
.expect(true, GetIndexAction.NAME, "wrong_access", contains("*", "-*"))
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -272,8 +288,8 @@ public class JdbcSecurityIT extends SqlSecurityTestCase {
|
||||
"only_a",
|
||||
con -> con.getMetaData().getColumns("%", "%", "test", "%"));
|
||||
new AuditLogAsserter()
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "test_admin", contains("test"))
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "only_a", contains("test"))
|
||||
.expect(true, GetIndexAction.NAME, "test_admin", contains("test"))
|
||||
.expect(true, GetIndexAction.NAME, "only_a", contains("test"))
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -294,7 +310,7 @@ public class JdbcSecurityIT extends SqlSecurityTestCase {
|
||||
assertFalse(result.next());
|
||||
}
|
||||
new AuditLogAsserter()
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "not_c", contains("test"))
|
||||
.expect(true, GetIndexAction.NAME, "not_c", contains("test"))
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -306,8 +322,8 @@ public class JdbcSecurityIT extends SqlSecurityTestCase {
|
||||
"no_3s",
|
||||
con -> con.getMetaData().getColumns("%", "%", "test", "%"));
|
||||
new AuditLogAsserter()
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "test_admin", contains("test"))
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "no_3s", contains("test"))
|
||||
.expect(true, GetIndexAction.NAME, "test_admin", contains("test"))
|
||||
.expect(true, GetIndexAction.NAME, "no_3s", contains("test"))
|
||||
.assertLogs();
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class RestSqlSecurityIT extends SqlSecurityTestCase {
|
||||
private static class RestActions implements Actions {
|
||||
@ -111,7 +112,15 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
|
||||
@Override
|
||||
public void expectForbidden(String user, String sql) {
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> runSql(user, sql));
|
||||
assertThat(e.getMessage(), containsString("403 Forbidden"));
|
||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(403));
|
||||
assertThat(e.getMessage(), containsString("unauthorized"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void expectUnknownIndex(String user, String sql) {
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> runSql(user, sql));
|
||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
|
||||
assertThat(e.getMessage(), containsString("Unknown index"));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -171,7 +180,7 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
|
||||
assertEquals(404, e.getResponse().getStatusLine().getStatusCode());
|
||||
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithSyncLookup("test_admin", "test")
|
||||
.expectSqlCompositeAction("test_admin", "test")
|
||||
.expect(true, SQL_ACTION_NAME, "full_access", empty())
|
||||
// One scroll access denied per shard
|
||||
.expect(false, SQL_ACTION_NAME, "full_access", empty(), "InternalScrollSearchRequest")
|
||||
|
@ -9,6 +9,8 @@ import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.lucene.util.SuppressForbidden;
|
||||
import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -18,6 +20,7 @@ import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@ -61,13 +64,12 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
void expectScrollMatchesAdmin(String adminSql, String user, String userSql) throws Exception;
|
||||
void expectDescribe(Map<String, String> columns, String user) throws Exception;
|
||||
void expectShowTables(List<String> tables, String user) throws Exception;
|
||||
|
||||
void expectForbidden(String user, String sql) throws Exception;
|
||||
void expectUnknownIndex(String user, String sql) throws Exception;
|
||||
void expectUnknownColumn(String user, String sql, String column) throws Exception;
|
||||
}
|
||||
|
||||
protected static final String SQL_ACTION_NAME = "indices:data/read/sql";
|
||||
protected static final String SQL_INDICES_ACTION_NAME = "indices:data/read/sql/indices";
|
||||
/**
|
||||
* Location of the audit log file. We could technically figure this out by reading the admin
|
||||
* APIs but it isn't worth doing because we also have to give ourselves permission to read
|
||||
@ -182,7 +184,7 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
public void testQueryWorksAsAdmin() throws Exception {
|
||||
actions.queryWorksAsAdmin();
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithSyncLookup("test_admin", "test")
|
||||
.expectSqlCompositeAction("test_admin", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -191,8 +193,8 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
|
||||
actions.expectMatchesAdmin("SELECT * FROM test ORDER BY a", "full_access", "SELECT * FROM test ORDER BY a");
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithSyncLookup("test_admin", "test")
|
||||
.expectSqlWithSyncLookup("full_access", "test")
|
||||
.expectSqlCompositeAction("test_admin", "test")
|
||||
.expectSqlCompositeAction("full_access", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -201,12 +203,12 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
|
||||
actions.expectScrollMatchesAdmin("SELECT * FROM test ORDER BY a", "full_access", "SELECT * FROM test ORDER BY a");
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithSyncLookup("test_admin", "test")
|
||||
.expectSqlCompositeAction("test_admin", "test")
|
||||
/* Scrolling doesn't have to access the index again, at least not through sql.
|
||||
* If we asserted query and scroll logs then we would see the scoll. */
|
||||
* If we asserted query and scroll logs then we would see the scroll. */
|
||||
.expect(true, SQL_ACTION_NAME, "test_admin", empty())
|
||||
.expect(true, SQL_ACTION_NAME, "test_admin", empty())
|
||||
.expectSqlWithSyncLookup("full_access", "test")
|
||||
.expectSqlCompositeAction("full_access", "test")
|
||||
.expect(true, SQL_ACTION_NAME, "full_access", empty())
|
||||
.expect(true, SQL_ACTION_NAME, "full_access", empty())
|
||||
.assertLogs();
|
||||
@ -224,14 +226,12 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
public void testQueryWrongAccess() throws Exception {
|
||||
createUser("wrong_access", "read_something_else");
|
||||
|
||||
actions.expectForbidden("wrong_access", "SELECT * FROM test");
|
||||
actions.expectUnknownIndex("wrong_access", "SELECT * FROM test");
|
||||
new AuditLogAsserter()
|
||||
/* This user has permission to run sql queries so they are
|
||||
* given preliminary authorization. */
|
||||
//This user has permission to run sql queries so they are given preliminary authorization
|
||||
.expect(true, SQL_ACTION_NAME, "wrong_access", empty())
|
||||
/* But as soon as they attempt to resolve an index that
|
||||
* they don't have access to they get denied. */
|
||||
.expect(false, SQL_ACTION_NAME, "wrong_access", hasItems("test"))
|
||||
//the following get index is granted too but against the no indices placeholder, as ignore_unavailable=true
|
||||
.expect(true, GetIndexAction.NAME, "wrong_access", hasItems("*", "-*"))
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -240,8 +240,8 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
|
||||
actions.expectMatchesAdmin("SELECT a FROM test ORDER BY a", "only_a", "SELECT * FROM test ORDER BY a");
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithSyncLookup("test_admin", "test")
|
||||
.expectSqlWithSyncLookup("only_a", "test")
|
||||
.expectSqlCompositeAction("test_admin", "test")
|
||||
.expectSqlCompositeAction("only_a", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -250,12 +250,12 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
|
||||
actions.expectScrollMatchesAdmin("SELECT a FROM test ORDER BY a", "only_a", "SELECT * FROM test ORDER BY a");
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithSyncLookup("test_admin", "test")
|
||||
.expectSqlCompositeAction("test_admin", "test")
|
||||
/* Scrolling doesn't have to access the index again, at least not through sql.
|
||||
* If we asserted query and scroll logs then we would see the scoll. */
|
||||
.expect(true, SQL_ACTION_NAME, "test_admin", empty())
|
||||
.expect(true, SQL_ACTION_NAME, "test_admin", empty())
|
||||
.expectSqlWithSyncLookup("only_a", "test")
|
||||
.expectSqlCompositeAction("only_a", "test")
|
||||
.expect(true, SQL_ACTION_NAME, "only_a", empty())
|
||||
.expect(true, SQL_ACTION_NAME, "only_a", empty())
|
||||
.assertLogs();
|
||||
@ -272,7 +272,7 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
* out but it failed in SQL because it couldn't compile the
|
||||
* query without the metadata for the missing field. */
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithSyncLookup("only_a", "test")
|
||||
.expectSqlCompositeAction("only_a", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -281,8 +281,8 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
|
||||
actions.expectMatchesAdmin("SELECT a, b FROM test ORDER BY a", "not_c", "SELECT * FROM test ORDER BY a");
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithSyncLookup("test_admin", "test")
|
||||
.expectSqlWithSyncLookup("not_c", "test")
|
||||
.expectSqlCompositeAction("test_admin", "test")
|
||||
.expectSqlCompositeAction("not_c", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -291,12 +291,12 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
|
||||
actions.expectScrollMatchesAdmin("SELECT a, b FROM test ORDER BY a", "not_c", "SELECT * FROM test ORDER BY a");
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithSyncLookup("test_admin", "test")
|
||||
.expectSqlCompositeAction("test_admin", "test")
|
||||
/* Scrolling doesn't have to access the index again, at least not through sql.
|
||||
* If we asserted query and scroll logs then we would see the scoll. */
|
||||
* If we asserted query and scroll logs then we would see the scroll. */
|
||||
.expect(true, SQL_ACTION_NAME, "test_admin", empty())
|
||||
.expect(true, SQL_ACTION_NAME, "test_admin", empty())
|
||||
.expectSqlWithSyncLookup("not_c", "test")
|
||||
.expectSqlCompositeAction("not_c", "test")
|
||||
.expect(true, SQL_ACTION_NAME, "not_c", empty())
|
||||
.expect(true, SQL_ACTION_NAME, "not_c", empty())
|
||||
.assertLogs();
|
||||
@ -313,24 +313,24 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
* out but it failed in SQL because it couldn't compile the
|
||||
* query without the metadata for the missing field. */
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithSyncLookup("not_c", "test")
|
||||
.expectSqlCompositeAction("not_c", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
public void testQueryDocumentExclued() throws Exception {
|
||||
public void testQueryDocumentExcluded() throws Exception {
|
||||
createUser("no_3s", "read_test_without_c_3");
|
||||
|
||||
actions.expectMatchesAdmin("SELECT * FROM test WHERE c != 3 ORDER BY a", "no_3s", "SELECT * FROM test ORDER BY a");
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithSyncLookup("test_admin", "test")
|
||||
.expectSqlWithSyncLookup("no_3s", "test")
|
||||
.expectSqlCompositeAction("test_admin", "test")
|
||||
.expectSqlCompositeAction("no_3s", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
public void testShowTablesWorksAsAdmin() throws Exception {
|
||||
actions.expectShowTables(Arrays.asList("bort", "test"), null);
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithAsyncLookup("test_admin", "bort", "test")
|
||||
.expectSqlCompositeAction("test_admin", "bort", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -339,8 +339,8 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
|
||||
actions.expectMatchesAdmin("SHOW TABLES", "full_access", "SHOW TABLES");
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithAsyncLookup("test_admin", "bort", "test")
|
||||
.expectSqlWithAsyncLookup("full_access", "bort", "test")
|
||||
.expectSqlCompositeAction("test_admin", "bort", "test")
|
||||
.expectSqlCompositeAction("full_access", "bort", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -358,8 +358,8 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
|
||||
actions.expectMatchesAdmin("SHOW TABLES LIKE 'bort'", "read_bort", "SHOW TABLES");
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithAsyncLookup("test_admin", "bort")
|
||||
.expectSqlWithAsyncLookup("read_bort", "bort")
|
||||
.expectSqlCompositeAction("test_admin", "bort")
|
||||
.expectSqlCompositeAction("read_bort", "bort")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -369,9 +369,9 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
actions.expectMatchesAdmin("SHOW TABLES LIKE 'not_created'", "read_bort", "SHOW TABLES LIKE 'test'");
|
||||
new AuditLogAsserter()
|
||||
.expect(true, SQL_ACTION_NAME, "test_admin", empty())
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "test_admin", contains("*", "-*"))
|
||||
.expect(true, GetIndexAction.NAME, "test_admin", contains("*", "-*"))
|
||||
.expect(true, SQL_ACTION_NAME, "read_bort", empty())
|
||||
.expect(true, SQL_INDICES_ACTION_NAME, "read_bort", contains("*", "-*"))
|
||||
.expect(true, GetIndexAction.NAME, "read_bort", contains("*", "-*"))
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -382,7 +382,7 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
expected.put("c", "BIGINT");
|
||||
actions.expectDescribe(expected, null);
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithAsyncLookup("test_admin", "test")
|
||||
.expectSqlCompositeAction("test_admin", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -391,8 +391,8 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
|
||||
actions.expectMatchesAdmin("DESCRIBE test", "full_access", "DESCRIBE test");
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithAsyncLookup("test_admin", "test")
|
||||
.expectSqlWithAsyncLookup("full_access", "test")
|
||||
.expectSqlCompositeAction("test_admin", "test")
|
||||
.expectSqlCompositeAction("full_access", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -408,14 +408,12 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
public void testDescribeWithWrongAccess() throws Exception {
|
||||
createUser("wrong_access", "read_something_else");
|
||||
|
||||
actions.expectForbidden("wrong_access", "DESCRIBE test");
|
||||
actions.expectDescribe(Collections.emptyMap(), "wrong_access");
|
||||
new AuditLogAsserter()
|
||||
/* This user has permission to run sql queries so they are
|
||||
* given preliminary authorization. */
|
||||
//This user has permission to run sql queries so they are given preliminary authorization
|
||||
.expect(true, SQL_ACTION_NAME, "wrong_access", empty())
|
||||
/* But as soon as they attempt to resolve an index that
|
||||
* they don't have access to they get denied. */
|
||||
.expect(false, SQL_INDICES_ACTION_NAME, "wrong_access", hasItems("test"))
|
||||
//the following get index is granted too but against the no indices placeholder, as ignore_unavailable=true
|
||||
.expect(true, GetIndexAction.NAME, "wrong_access", hasItems("*", "-*"))
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -424,7 +422,7 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
|
||||
actions.expectDescribe(singletonMap("a", "BIGINT"), "only_a");
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithAsyncLookup("only_a", "test")
|
||||
.expectSqlCompositeAction("only_a", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
@ -436,21 +434,21 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
expected.put("b", "BIGINT");
|
||||
actions.expectDescribe(expected, "not_c");
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithAsyncLookup("not_c", "test")
|
||||
.expectSqlCompositeAction("not_c", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
public void testDescribeDocumentExclued() throws Exception {
|
||||
public void testDescribeDocumentExcluded() throws Exception {
|
||||
createUser("no_3s", "read_test_without_c_3");
|
||||
|
||||
actions.expectMatchesAdmin("DESCRIBE test", "no_3s", "DESCRIBE test");
|
||||
new AuditLogAsserter()
|
||||
.expectSqlWithAsyncLookup("test_admin", "test")
|
||||
.expectSqlWithAsyncLookup("no_3s", "test")
|
||||
.expectSqlCompositeAction("test_admin", "test")
|
||||
.expectSqlCompositeAction("no_3s", "test")
|
||||
.assertLogs();
|
||||
}
|
||||
|
||||
protected final void createUser(String name, String role) throws IOException {
|
||||
protected static void createUser(String name, String role) throws IOException {
|
||||
XContentBuilder user = JsonXContent.contentBuilder().prettyPrint().startObject(); {
|
||||
user.field("password", "testpass");
|
||||
user.field("roles", role);
|
||||
@ -468,17 +466,12 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
protected final class AuditLogAsserter {
|
||||
private final List<Function<Map<String, Object>, Boolean>> logCheckers = new ArrayList<>();
|
||||
|
||||
public AuditLogAsserter expectSqlWithAsyncLookup(String user, String... indices) {
|
||||
expect(true, SQL_ACTION_NAME, user, empty());
|
||||
expect(true, SQL_INDICES_ACTION_NAME, user, contains(indices));
|
||||
return this;
|
||||
}
|
||||
|
||||
public AuditLogAsserter expectSqlWithSyncLookup(String user, String... indices) {
|
||||
public AuditLogAsserter expectSqlCompositeAction(String user, String... indices) {
|
||||
expect(true, SQL_ACTION_NAME, user, empty());
|
||||
for (String index : indices) {
|
||||
expect(true, SQL_ACTION_NAME, user, hasItems(index));
|
||||
}
|
||||
expect(true, GetIndexAction.NAME, user, hasItems(indices));
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -489,8 +482,8 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
case SQL_ACTION_NAME:
|
||||
request = "SqlRequest";
|
||||
break;
|
||||
case SQL_INDICES_ACTION_NAME:
|
||||
request = "Request";
|
||||
case GetIndexAction.NAME:
|
||||
request = GetIndexRequest.class.getSimpleName();
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown action [" + action + "]");
|
||||
@ -559,7 +552,8 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
log.put("run_as_principal", m.group(i++));
|
||||
log.put("run_by_principal", m.group(i++));
|
||||
String action = m.group(i++);
|
||||
if (false == (SQL_ACTION_NAME.equals(action) || SQL_INDICES_ACTION_NAME.equals(action))) {
|
||||
if (false == (SQL_ACTION_NAME.equals(action) || GetIndexAction.NAME.equals(action))) {
|
||||
//TODO we may want to extend this and the assertions to SearchAction.NAME as well
|
||||
continue;
|
||||
}
|
||||
log.put("action", action);
|
||||
@ -574,7 +568,7 @@ public abstract class SqlSecurityTestCase extends ESRestTestCase {
|
||||
indices.remove(".security-6");
|
||||
}
|
||||
log.put("indices", indices);
|
||||
log.put("request", m.group(i++));
|
||||
log.put("request", m.group(i));
|
||||
logs.add(log);
|
||||
}
|
||||
List<Map<String, Object>> allLogs = new ArrayList<>(logs);
|
||||
|
@ -10,14 +10,9 @@ import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.FilterClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
|
||||
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
|
||||
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction;
|
||||
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
|
||||
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest;
|
||||
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
|
||||
@ -50,12 +45,6 @@ public class EmbeddedModeFilterClient extends FilterClient {
|
||||
|
||||
if (action == SqlAction.INSTANCE) {
|
||||
TransportSqlAction.operation(planExecutor, (SqlRequest) request, (ActionListener<SqlResponse>) listener);
|
||||
} else if (action == SqlGetIndicesAction.INSTANCE) {
|
||||
admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(response -> {
|
||||
SqlGetIndicesAction.operation(new IndexNameExpressionResolver(Settings.EMPTY), EsCatalog::new,
|
||||
(SqlGetIndicesAction.Request) request, response.getState(),
|
||||
(ActionListener<SqlGetIndicesAction.Response>) listener);
|
||||
}, listener::onFailure));
|
||||
} else {
|
||||
super.doExecute(action, request, listener);
|
||||
}
|
||||
|
@ -6,12 +6,12 @@
|
||||
package org.elasticsearch.xpack.qa.sql.embed;
|
||||
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.test.rest.FakeRestChannel;
|
||||
import org.elasticsearch.test.rest.FakeRestRequest;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.IndexResolver;
|
||||
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
|
||||
import org.elasticsearch.xpack.sql.plugin.RestSqlJdbcAction;
|
||||
import org.elasticsearch.xpack.sql.plugin.SqlLicenseChecker;
|
||||
@ -26,7 +26,8 @@ class JdbcProtoHandler extends ProtoHandler {
|
||||
|
||||
JdbcProtoHandler(Client client) {
|
||||
super(client);
|
||||
action = new RestSqlJdbcAction(Settings.EMPTY, mock(RestController.class), new SqlLicenseChecker(() -> {}, () -> {}));
|
||||
action = new RestSqlJdbcAction(Settings.EMPTY, mock(RestController.class), new SqlLicenseChecker(() -> {}, () -> {}),
|
||||
new IndexResolver(client, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -12,25 +12,21 @@ import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.IndexResolver;
|
||||
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
|
||||
|
||||
public abstract class ProtoHandler implements HttpHandler, AutoCloseable {
|
||||
|
||||
private static PlanExecutor planExecutor(EmbeddedModeFilterClient client) {
|
||||
Supplier<ClusterState> clusterStateSupplier = () -> client.admin().cluster().prepareState().get(timeValueMinutes(1)).getState();
|
||||
return new PlanExecutor(client, clusterStateSupplier, EsCatalog::new);
|
||||
return new PlanExecutor(client, new IndexResolver(client, null));
|
||||
}
|
||||
|
||||
protected static final Logger log = ESLoggerFactory.getLogger(ProtoHandler.class.getName());
|
||||
|
@ -7,7 +7,6 @@ package org.elasticsearch.xpack.sql.analysis.analyzer;
|
||||
|
||||
import org.elasticsearch.xpack.sql.analysis.AnalysisException;
|
||||
import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier.Failure;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog.GetIndexResult;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
|
||||
import org.elasticsearch.xpack.sql.capabilities.Resolvables;
|
||||
@ -78,11 +77,9 @@ import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine;
|
||||
public class Analyzer extends RuleExecutor<LogicalPlan> {
|
||||
|
||||
private final FunctionRegistry functionRegistry;
|
||||
private final Catalog catalog;
|
||||
|
||||
public Analyzer(FunctionRegistry functionRegistry, Catalog catalog) {
|
||||
public Analyzer(FunctionRegistry functionRegistry) {
|
||||
this.functionRegistry = functionRegistry;
|
||||
this.catalog = catalog;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -190,6 +187,7 @@ public class Analyzer extends RuleExecutor<LogicalPlan> {
|
||||
}
|
||||
|
||||
// too many references - should it be ignored?
|
||||
// TODO: move away from exceptions inside the analyzer
|
||||
if (!lenient) {
|
||||
throw new AnalysisException(u, "Reference %s is ambiguous, matches any of %s", u.nodeString(), matches);
|
||||
}
|
||||
@ -260,12 +258,12 @@ public class Analyzer extends RuleExecutor<LogicalPlan> {
|
||||
TableIdentifier table = plan.table();
|
||||
EsIndex found = null;
|
||||
|
||||
GetIndexResult index = catalog.getIndex(table.index());
|
||||
GetIndexResult index = SqlSession.currentContext().catalog.getIndex(table.index());
|
||||
if (index.isValid()) {
|
||||
found = index.get();
|
||||
}
|
||||
if (found == null) {
|
||||
return plan;
|
||||
} else {
|
||||
return plan.unresolvedMessage().equals(index.toString()) ? plan : new UnresolvedRelation(plan.location(), plan.table(),
|
||||
plan.alias(), index.toString());
|
||||
}
|
||||
|
||||
LogicalPlan catalogTable = new EsRelation(plan.location(), found);
|
||||
@ -699,7 +697,7 @@ public class Analyzer extends RuleExecutor<LogicalPlan> {
|
||||
return new UnresolvedFunction(uf.location(), uf.name(), uf.distinct(), uf.children(), true, message);
|
||||
}
|
||||
// TODO: look into Generator for significant terms, etc..
|
||||
Function f = functionRegistry.resolveFunction(uf, SqlSession.CURRENT_SETTINGS.get());
|
||||
Function f = functionRegistry.resolveFunction(uf, SqlSession.currentContext().configuration);
|
||||
|
||||
list.add(f);
|
||||
return f;
|
||||
|
@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.analysis.analyzer;
|
||||
|
||||
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.sql.plan.logical.UnresolvedRelation;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
// Since the pre-analyzer only inspect (and does NOT transform) the tree
|
||||
// it is not built as a rule executor.
|
||||
// Further more it applies 'the rules' only once and needs to return some
|
||||
// state back.
|
||||
public class PreAnalyzer {
|
||||
|
||||
public static class PreAnalysis {
|
||||
public static final PreAnalysis EMPTY = new PreAnalysis(emptyList());
|
||||
|
||||
public final List<String> indices;
|
||||
|
||||
PreAnalysis(List<String> indices) {
|
||||
this.indices = indices;
|
||||
}
|
||||
}
|
||||
|
||||
public PreAnalysis preAnalyze(LogicalPlan plan) {
|
||||
if (plan.analyzed()) {
|
||||
return PreAnalysis.EMPTY;
|
||||
}
|
||||
|
||||
return doPreAnalyze(plan);
|
||||
}
|
||||
|
||||
private PreAnalysis doPreAnalyze(LogicalPlan plan) {
|
||||
List<String> indices = new ArrayList<>();
|
||||
|
||||
plan.forEachUp(p -> indices.add(p.table().index()), UnresolvedRelation.class);
|
||||
|
||||
// mark plan as preAnalyzed (if it were marked, there would be no analysis)
|
||||
plan.forEachUp(LogicalPlan::setPreAnalyzed);
|
||||
|
||||
return new PreAnalysis(indices);
|
||||
}
|
||||
}
|
@ -15,6 +15,9 @@ import java.util.Objects;
|
||||
* into a representation that is compatible with SQL (@{link {@link EsIndex}).
|
||||
*/
|
||||
public interface Catalog {
|
||||
|
||||
Catalog EMPTY = GetIndexResult::notFound;
|
||||
|
||||
/**
|
||||
* Lookup the information for a table.
|
||||
*/
|
||||
@ -35,12 +38,11 @@ public interface Catalog {
|
||||
return invalid("Index '" + name + "' does not exist");
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private final EsIndex index;
|
||||
@Nullable
|
||||
private final String invalid;
|
||||
|
||||
private GetIndexResult(@Nullable EsIndex index, @Nullable String invalid) {
|
||||
private GetIndexResult(EsIndex index, @Nullable String invalid) {
|
||||
this.index = index;
|
||||
this.invalid = invalid;
|
||||
}
|
||||
@ -82,10 +84,7 @@ public interface Catalog {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (invalid != null) {
|
||||
return "GetIndexResult[" + invalid + "]";
|
||||
}
|
||||
return "GetIndexResult[" + index + "]";
|
||||
return invalid != null ? invalid : index.name();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,74 +0,0 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.analysis.catalog;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.xpack.sql.SqlException;
|
||||
import org.elasticsearch.xpack.sql.type.DataType;
|
||||
import org.elasticsearch.xpack.sql.type.Types;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class EsCatalog implements Catalog {
|
||||
private final ClusterState clusterState;
|
||||
|
||||
public EsCatalog(ClusterState clusterState) {
|
||||
this.clusterState = clusterState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetIndexResult getIndex(String index) throws SqlException {
|
||||
IndexMetaData idx = clusterState.getMetaData().index(index);
|
||||
if (idx == null) {
|
||||
return GetIndexResult.notFound(index);
|
||||
}
|
||||
if (idx.getIndex().getName().startsWith(".")) {
|
||||
/* Indices that start with "." are considered internal and
|
||||
* should not be available to SQL. */
|
||||
return GetIndexResult.invalid(
|
||||
"[" + idx.getIndex().getName() + "] starts with [.] so it is considered internal and incompatible with sql");
|
||||
}
|
||||
|
||||
// Make sure that the index contains only a single type
|
||||
MappingMetaData singleType = null;
|
||||
List<String> typeNames = null;
|
||||
for (ObjectObjectCursor<String, MappingMetaData> type : idx.getMappings()) {
|
||||
/* We actually ignore the _default_ mapping because it is still
|
||||
* allowed but deprecated. */
|
||||
if ("_default_".equals(type.key)) {
|
||||
continue;
|
||||
}
|
||||
if (singleType != null) {
|
||||
// There are more than one types
|
||||
if (typeNames == null) {
|
||||
typeNames = new ArrayList<>();
|
||||
typeNames.add(singleType.type());
|
||||
}
|
||||
typeNames.add(type.key);
|
||||
}
|
||||
singleType = type.value;
|
||||
}
|
||||
if (singleType == null) {
|
||||
return GetIndexResult.invalid("[" + idx.getIndex().getName() + "] doesn't have any types so it is incompatible with sql");
|
||||
}
|
||||
if (typeNames != null) {
|
||||
Collections.sort(typeNames);
|
||||
return GetIndexResult.invalid(
|
||||
"[" + idx.getIndex().getName() + "] contains more than one type " + typeNames + " so it is incompatible with sql");
|
||||
}
|
||||
Map<String, DataType> mapping = Types.fromEs(singleType.sourceAsMap());
|
||||
List<String> aliases = Arrays.asList(idx.getAliases().keys().toArray(String.class));
|
||||
return GetIndexResult.valid(new EsIndex(idx.getIndex().getName(), mapping, aliases, idx.getSettings()));
|
||||
}
|
||||
}
|
@ -5,32 +5,23 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.analysis.catalog;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.xpack.sql.type.DataType;
|
||||
import org.elasticsearch.xpack.sql.util.StringUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
||||
public class EsIndex {
|
||||
|
||||
public static final EsIndex NOT_FOUND = new EsIndex(StringUtils.EMPTY, emptyMap(), emptyList(), Settings.EMPTY);
|
||||
public static final EsIndex NOT_FOUND = new EsIndex(StringUtils.EMPTY, emptyMap());
|
||||
|
||||
// TODO Double check that we need these and that we're securing them properly.
|
||||
// Tracked by https://github.com/elastic/x-pack-elasticsearch/issues/3076
|
||||
private final String name;
|
||||
private final Map<String, DataType> mapping;
|
||||
private final List<String> aliases;
|
||||
private final Settings settings;
|
||||
|
||||
public EsIndex(String name, Map<String, DataType> mapping, List<String> aliases, Settings settings) {
|
||||
public EsIndex(String name, Map<String, DataType> mapping) {
|
||||
this.name = name;
|
||||
this.mapping = mapping;
|
||||
this.aliases = aliases;
|
||||
this.settings = settings;
|
||||
}
|
||||
|
||||
public String name() {
|
||||
@ -41,14 +32,6 @@ public class EsIndex {
|
||||
return mapping;
|
||||
}
|
||||
|
||||
public List<String> aliases() {
|
||||
return aliases;
|
||||
}
|
||||
|
||||
public Settings settings() {
|
||||
return settings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name;
|
||||
|
@ -0,0 +1,144 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.analysis.catalog;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexRequest.Feature;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog.GetIndexResult;
|
||||
import org.elasticsearch.xpack.sql.type.DataType;
|
||||
import org.elasticsearch.xpack.sql.type.Types;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class IndexResolver {
|
||||
|
||||
private final Client client;
|
||||
private final FilteredCatalog.Filter catalogFilter;
|
||||
|
||||
public IndexResolver(Client client, FilteredCatalog.Filter catalogFilter) {
|
||||
this.client = client;
|
||||
this.catalogFilter = catalogFilter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves a single catalog by name.
|
||||
*/
|
||||
public void asCatalog(final String index, ActionListener<Catalog> listener) {
|
||||
GetIndexRequest getIndexRequest = createGetIndexRequest(index);
|
||||
client.admin().indices().getIndex(getIndexRequest, ActionListener.wrap(getIndexResponse -> {
|
||||
Map<String, GetIndexResult> results = new HashMap<>();
|
||||
if (getIndexResponse.getMappings().size() > 1) {
|
||||
results.put(index, GetIndexResult.invalid(
|
||||
"[" + index + "] is an alias pointing to more than one index which is currently incompatible with sql"));
|
||||
} else if (getIndexResponse.getMappings().size() == 1){
|
||||
ObjectObjectCursor<String, ImmutableOpenMap<String, MappingMetaData>> indexMappings =
|
||||
getIndexResponse.getMappings().iterator().next();
|
||||
String concreteIndex = indexMappings.key;
|
||||
/*
|
||||
* here we don't support wildcards: we can either have an alias or an index. However names get resolved (through
|
||||
* security or not) we need to preserve the original names as they will be used in the subsequent search request.
|
||||
* With security enabled, if the user is authorized for an alias and not its corresponding concrete index, we have to
|
||||
* make sure that the search is executed against the same alias name from the original command, rather than
|
||||
* the resolved concrete index that we get back from the get index API
|
||||
*/
|
||||
results.put(index, buildGetIndexResult(concreteIndex, index, indexMappings.value));
|
||||
}
|
||||
Catalog catalog = new PreloadedCatalog(results);
|
||||
catalog = catalogFilter != null ? new FilteredCatalog(catalog, catalogFilter) : catalog;
|
||||
listener.onResponse(catalog);
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
/**
|
||||
* Discover (multiple) matching indices for a given name.
|
||||
*/
|
||||
//TODO this method can take a single index pattern once SqlGetIndicesAction is removed
|
||||
public void asList(ActionListener<List<EsIndex>> listener, String... indices) {
|
||||
GetIndexRequest getIndexRequest = createGetIndexRequest(indices);
|
||||
client.admin().indices().getIndex(getIndexRequest, ActionListener.wrap(getIndexResponse -> {
|
||||
Map<String, GetIndexResult> map = new HashMap<>();
|
||||
for (ObjectObjectCursor<String, ImmutableOpenMap<String, MappingMetaData>> indexMappings : getIndexResponse.getMappings()) {
|
||||
/*
|
||||
* We support wildcard expressions here, and it's only for commands that only perform the get index call.
|
||||
* We can and simply have to use the concrete index name and show that to users.
|
||||
* Get index against an alias with security enabled, where the user has only access to get mappings for the alias
|
||||
* and not the concrete index: there is a well known information leak of the concrete index name in the response.
|
||||
*/
|
||||
String concreteIndex = indexMappings.key;
|
||||
map.put(concreteIndex, buildGetIndexResult(concreteIndex, concreteIndex, indexMappings.value));
|
||||
}
|
||||
List<EsIndex> results = new ArrayList<>(map.size());
|
||||
for (GetIndexResult result : map.values()) {
|
||||
if (result.isValid()) {
|
||||
//as odd as this is, it will go away once mappings are returned filtered
|
||||
GetIndexResult filtered = catalogFilter != null ? catalogFilter.filterIndex(result) : result;
|
||||
results.add(filtered.get());
|
||||
}
|
||||
}
|
||||
results.sort(Comparator.comparing(EsIndex::name));
|
||||
listener.onResponse(results);
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
private static GetIndexRequest createGetIndexRequest(String... indices) {
|
||||
return new GetIndexRequest()
|
||||
.local(true)
|
||||
.indices(indices)
|
||||
.features(Feature.MAPPINGS)
|
||||
//lenient because we throw our own errors looking at the response e.g. if something was not resolved
|
||||
//also because this way security doesn't throw authorization exceptions but rather honours ignore_unavailable
|
||||
.indicesOptions(IndicesOptions.lenientExpandOpen());
|
||||
}
|
||||
|
||||
private static GetIndexResult buildGetIndexResult(String concreteIndex, String indexOrAlias,
|
||||
ImmutableOpenMap<String, MappingMetaData> mappings) {
|
||||
if (concreteIndex.startsWith(".")) {
|
||||
//Indices that start with "." are considered internal and should not be available to SQL
|
||||
return GetIndexResult.notFound(indexOrAlias);
|
||||
}
|
||||
|
||||
// Make sure that the index contains only a single type
|
||||
MappingMetaData singleType = null;
|
||||
List<String> typeNames = null;
|
||||
for (ObjectObjectCursor<String, MappingMetaData> type : mappings) {
|
||||
//Default mappings are ignored as they are applied to each type. Each type alone holds all of its fields.
|
||||
if ("_default_".equals(type.key)) {
|
||||
continue;
|
||||
}
|
||||
if (singleType != null) {
|
||||
// There are more than one types
|
||||
if (typeNames == null) {
|
||||
typeNames = new ArrayList<>();
|
||||
typeNames.add(singleType.type());
|
||||
}
|
||||
typeNames.add(type.key);
|
||||
}
|
||||
singleType = type.value;
|
||||
}
|
||||
|
||||
if (singleType == null) {
|
||||
return GetIndexResult.invalid("[" + indexOrAlias + "] doesn't have any types so it is incompatible with sql");
|
||||
} else if (typeNames != null) {
|
||||
Collections.sort(typeNames);
|
||||
return GetIndexResult.invalid(
|
||||
"[" + indexOrAlias + "] contains more than one type " + typeNames + " so it is incompatible with sql");
|
||||
} else {
|
||||
Map<String, DataType> mapping = Types.fromEs(singleType.sourceAsMap());
|
||||
return GetIndexResult.valid(new EsIndex(indexOrAlias, mapping));
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.analysis.catalog;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
public class PreloadedCatalog implements Catalog {
|
||||
|
||||
private final Map<String, GetIndexResult> map;
|
||||
|
||||
public PreloadedCatalog(Map<String, GetIndexResult> map) {
|
||||
this.map = map;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetIndexResult getIndex(String index) {
|
||||
return map.getOrDefault(index, GetIndexResult.notFound(index));
|
||||
}
|
||||
}
|
@ -7,16 +7,16 @@ package org.elasticsearch.xpack.sql.execution;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
|
||||
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
|
||||
import org.elasticsearch.xpack.sql.analysis.analyzer.PreAnalyzer;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.IndexResolver;
|
||||
import org.elasticsearch.xpack.sql.execution.search.SourceGenerator;
|
||||
import org.elasticsearch.xpack.sql.expression.function.DefaultFunctionRegistry;
|
||||
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
|
||||
import org.elasticsearch.xpack.sql.optimizer.Optimizer;
|
||||
import org.elasticsearch.xpack.sql.parser.SqlParser;
|
||||
import org.elasticsearch.xpack.sql.plan.physical.EsQueryExec;
|
||||
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
|
||||
import org.elasticsearch.xpack.sql.planner.Planner;
|
||||
import org.elasticsearch.xpack.sql.planner.PlanningException;
|
||||
import org.elasticsearch.xpack.sql.session.Configuration;
|
||||
@ -25,64 +25,50 @@ import org.elasticsearch.xpack.sql.session.RowSet;
|
||||
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
|
||||
import org.elasticsearch.xpack.sql.session.SqlSession;
|
||||
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class PlanExecutor {
|
||||
private final Client client;
|
||||
private final Supplier<ClusterState> stateSupplier;
|
||||
private final Function<ClusterState, Catalog> catalogSupplier;
|
||||
|
||||
private final FunctionRegistry functionRegistry;
|
||||
|
||||
private final SqlParser parser;
|
||||
private final FunctionRegistry functionRegistry;
|
||||
private final IndexResolver catalogResolver;
|
||||
private final PreAnalyzer preAnalyzer;
|
||||
private final Analyzer analyzer;
|
||||
private final Optimizer optimizer;
|
||||
private final Planner planner;
|
||||
|
||||
public PlanExecutor(Client client, Supplier<ClusterState> stateSupplier,
|
||||
Function<ClusterState, Catalog> catalogSupplier) {
|
||||
public PlanExecutor(Client client, IndexResolver catalogResolver) {
|
||||
this.client = client;
|
||||
this.stateSupplier = stateSupplier;
|
||||
this.catalogSupplier = catalogSupplier;
|
||||
|
||||
this.parser = new SqlParser();
|
||||
this.catalogResolver = catalogResolver;
|
||||
this.functionRegistry = new DefaultFunctionRegistry();
|
||||
|
||||
this.parser = new SqlParser();
|
||||
this.preAnalyzer = new PreAnalyzer();
|
||||
this.analyzer = new Analyzer(functionRegistry);
|
||||
this.optimizer = new Optimizer();
|
||||
this.planner = new Planner();
|
||||
}
|
||||
|
||||
public SqlSession newSession(Configuration cfg) {
|
||||
Catalog catalog = catalogSupplier.apply(stateSupplier.get());
|
||||
return new SqlSession(cfg, client, catalog, functionRegistry, parser, optimizer, planner);
|
||||
private SqlSession newSession(Configuration cfg) {
|
||||
return new SqlSession(cfg, client, functionRegistry, parser, catalogResolver, preAnalyzer, analyzer, optimizer, planner);
|
||||
}
|
||||
|
||||
|
||||
public SearchSourceBuilder searchSource(String sql, Configuration cfg) {
|
||||
PhysicalPlan executable = newSession(cfg).executable(sql);
|
||||
if (executable instanceof EsQueryExec) {
|
||||
EsQueryExec e = (EsQueryExec) executable;
|
||||
return SourceGenerator.sourceBuilder(e.queryContainer(), cfg.filter(), cfg.pageSize());
|
||||
}
|
||||
else {
|
||||
throw new PlanningException("Cannot generate a query DSL for %s", sql);
|
||||
}
|
||||
}
|
||||
|
||||
public void sql(String sql, ActionListener<SchemaRowSet> listener) {
|
||||
sql(Configuration.DEFAULT, sql, listener);
|
||||
public void searchSource(String sql, Configuration settings, ActionListener<SearchSourceBuilder> listener) {
|
||||
newSession(settings).sqlExecutable(sql, ActionListener.wrap(exec -> {
|
||||
if (exec instanceof EsQueryExec) {
|
||||
EsQueryExec e = (EsQueryExec) exec;
|
||||
listener.onResponse(SourceGenerator.sourceBuilder(e.queryContainer(), settings.filter(), settings.pageSize()));
|
||||
} else {
|
||||
listener.onFailure(new PlanningException("Cannot generate a query DSL for %s", sql));
|
||||
}
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
public void sql(Configuration cfg, String sql, ActionListener<SchemaRowSet> listener) {
|
||||
SqlSession session = newSession(cfg);
|
||||
try {
|
||||
PhysicalPlan executable = session.executable(sql);
|
||||
executable.execute(session, listener);
|
||||
} catch (Exception ex) {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
newSession(cfg).sql(sql, listener);
|
||||
}
|
||||
|
||||
public void nextPage(Configuration cfg, Cursor cursor, ActionListener<RowSet> listener) {
|
||||
cursor.nextPage(cfg, client, listener);
|
||||
}
|
||||
}
|
||||
}
|
@ -11,6 +11,8 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
//TODO: this class is thread-safe but used across multiple sessions might cause the id to roll over and potentially generate an already assigned id
|
||||
// making this session scope would simplify things
|
||||
// (which also begs the question on whether thread-safety is needed than)
|
||||
|
||||
// TODO: hook this into SqlSession#SessionContext
|
||||
public class ExpressionIdGenerator {
|
||||
|
||||
private static final AtomicLong GLOBAL_ID = new AtomicLong();
|
||||
|
@ -110,7 +110,7 @@ public enum Extract {
|
||||
};
|
||||
|
||||
public DateTimeFunction toFunction(Location source, Expression argument) {
|
||||
return toFunction(source, argument, SqlSession.CURRENT_SETTINGS.get().timeZone());
|
||||
return toFunction(source, argument, SqlSession.currentContext().configuration.timeZone());
|
||||
}
|
||||
|
||||
public abstract DateTimeFunction toFunction(Location source, Expression argument, DateTimeZone timeZone);
|
||||
|
@ -79,6 +79,7 @@ import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine;
|
||||
|
||||
|
||||
public class Optimizer extends RuleExecutor<LogicalPlan> {
|
||||
|
||||
public ExecutionInfo debugOptimize(LogicalPlan verified) {
|
||||
return verified.optimized() ? null : executeWithInfo(verified);
|
||||
}
|
||||
|
@ -14,8 +14,17 @@ import java.util.List;
|
||||
|
||||
public abstract class LogicalPlan extends QueryPlan<LogicalPlan> implements Resolvable {
|
||||
|
||||
private boolean analyzed = false;
|
||||
private boolean optimized = false;
|
||||
/**
|
||||
* Order is important in the enum; any values should be added at the end.
|
||||
*/
|
||||
public enum Stage {
|
||||
PARSED,
|
||||
PRE_ANALYZED,
|
||||
ANALYZED,
|
||||
OPTIMIZED;
|
||||
}
|
||||
|
||||
private Stage stage = Stage.PARSED;
|
||||
private Boolean lazyChildrenResolved = null;
|
||||
private Boolean lazyResolved = null;
|
||||
|
||||
@ -23,20 +32,28 @@ public abstract class LogicalPlan extends QueryPlan<LogicalPlan> implements Reso
|
||||
super(location, children);
|
||||
}
|
||||
|
||||
public boolean preAnalyzed() {
|
||||
return stage.ordinal() >= Stage.PRE_ANALYZED.ordinal();
|
||||
}
|
||||
|
||||
public void setPreAnalyzed() {
|
||||
stage = Stage.PRE_ANALYZED;
|
||||
}
|
||||
|
||||
public boolean analyzed() {
|
||||
return analyzed;
|
||||
return stage.ordinal() >= Stage.ANALYZED.ordinal();
|
||||
}
|
||||
|
||||
public void setAnalyzed() {
|
||||
analyzed = true;
|
||||
stage = Stage.ANALYZED;
|
||||
}
|
||||
|
||||
public boolean optimized() {
|
||||
return optimized;
|
||||
return stage.ordinal() >= Stage.OPTIMIZED.ordinal();
|
||||
}
|
||||
|
||||
public void setOptimized() {
|
||||
optimized = true;
|
||||
stage = Stage.OPTIMIZED;
|
||||
}
|
||||
|
||||
public final boolean childrenResolved() {
|
||||
@ -61,5 +78,4 @@ public abstract class LogicalPlan extends QueryPlan<LogicalPlan> implements Reso
|
||||
|
||||
@Override
|
||||
public abstract boolean equals(Object obj);
|
||||
|
||||
}
|
@ -18,11 +18,17 @@ public class UnresolvedRelation extends LeafPlan implements Unresolvable {
|
||||
|
||||
private final TableIdentifier table;
|
||||
private final String alias;
|
||||
private final String unresolvedMsg;
|
||||
|
||||
public UnresolvedRelation(Location location, TableIdentifier table, String alias) {
|
||||
this(location, table, alias, null);
|
||||
}
|
||||
|
||||
public UnresolvedRelation(Location location, TableIdentifier table, String alias, String unresolvedMessage) {
|
||||
super(location);
|
||||
this.table = table;
|
||||
this.alias = alias;
|
||||
this.unresolvedMsg = unresolvedMessage == null ? "Unknown index [" + table.index() + "]" : unresolvedMessage;
|
||||
}
|
||||
|
||||
public TableIdentifier table() {
|
||||
@ -38,11 +44,6 @@ public class UnresolvedRelation extends LeafPlan implements Unresolvable {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String unresolvedMessage() {
|
||||
return "Unknown index [" + table.index() + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean expressionsResolved() {
|
||||
return false;
|
||||
@ -53,6 +54,11 @@ public class UnresolvedRelation extends LeafPlan implements Unresolvable {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String unresolvedMessage() {
|
||||
return unresolvedMsg;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(table);
|
||||
@ -71,4 +77,4 @@ public class UnresolvedRelation extends LeafPlan implements Unresolvable {
|
||||
UnresolvedRelation other = (UnresolvedRelation) obj;
|
||||
return Objects.equals(table, other.table);
|
||||
}
|
||||
}
|
||||
}
|
@ -12,7 +12,6 @@ import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.sql.rule.RuleExecutor.Batch;
|
||||
import org.elasticsearch.xpack.sql.rule.RuleExecutor.ExecutionInfo;
|
||||
import org.elasticsearch.xpack.sql.rule.RuleExecutor.Transformation;
|
||||
import org.elasticsearch.xpack.sql.session.RowSet;
|
||||
import org.elasticsearch.xpack.sql.session.Rows;
|
||||
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
|
||||
import org.elasticsearch.xpack.sql.session.SqlSession;
|
||||
@ -29,6 +28,7 @@ import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.elasticsearch.action.ActionListener.wrap;
|
||||
|
||||
public class Debug extends Command {
|
||||
|
||||
@ -68,32 +68,30 @@ public class Debug extends Command {
|
||||
return singletonList(new RootFieldAttribute(location(), "plan", DataTypes.KEYWORD));
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Override
|
||||
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
|
||||
String planString = null;
|
||||
|
||||
ExecutionInfo info = null;
|
||||
|
||||
switch (type) {
|
||||
case ANALYZED:
|
||||
info = session.analyzer().debugAnalyze(plan);
|
||||
session.debugAnalyzedPlan(plan, wrap(i -> handleInfo(i, listener), listener::onFailure));
|
||||
break;
|
||||
|
||||
case OPTIMIZED:
|
||||
info = session.optimizer().debugOptimize(session.analyzedPlan(plan, true));
|
||||
session.analyzedPlan(plan, true,
|
||||
wrap(analyzedPlan -> handleInfo(session.optimizer().debugOptimize(analyzedPlan), listener), listener::onFailure));
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
private void handleInfo(ExecutionInfo info, ActionListener<SchemaRowSet> listener) {
|
||||
String planString = null;
|
||||
|
||||
if (format == Format.TEXT) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
if (info == null) {
|
||||
sb.append(plan.toString());
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
Map<Batch, List<Transformation>> map = info.transformations();
|
||||
|
||||
for (Entry<Batch, List<Transformation>> entry : map.entrySet()) {
|
||||
@ -110,12 +108,10 @@ public class Debug extends Command {
|
||||
}
|
||||
}
|
||||
planString = sb.toString();
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
if (info == null) {
|
||||
planString = Graphviz.dot("Planned", plan);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
Map<String, Node<?>> plans = new LinkedHashMap<>();
|
||||
Map<Batch, List<Transformation>> map = info.transformations();
|
||||
plans.put("start", info.before());
|
||||
@ -150,8 +146,6 @@ public class Debug extends Command {
|
||||
return false;
|
||||
}
|
||||
Debug o = (Debug) obj;
|
||||
return Objects.equals(format, o.format)
|
||||
&& Objects.equals(type, o.type)
|
||||
&& Objects.equals(plan, o.plan);
|
||||
return Objects.equals(format, o.format) && Objects.equals(type, o.type) && Objects.equals(plan, o.plan);
|
||||
}
|
||||
}
|
||||
}
|
@ -6,13 +6,13 @@
|
||||
package org.elasticsearch.xpack.sql.plan.logical.command;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.xpack.sql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
|
||||
import org.elasticsearch.xpack.sql.plan.QueryPlan;
|
||||
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
|
||||
import org.elasticsearch.xpack.sql.planner.Planner;
|
||||
import org.elasticsearch.xpack.sql.session.RowSet;
|
||||
import org.elasticsearch.xpack.sql.session.Rows;
|
||||
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
|
||||
import org.elasticsearch.xpack.sql.session.SqlSession;
|
||||
@ -22,16 +22,22 @@ import org.elasticsearch.xpack.sql.util.Graphviz;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
import static org.elasticsearch.action.ActionListener.wrap;
|
||||
|
||||
public class Explain extends Command {
|
||||
|
||||
public enum Type {
|
||||
PARSED, ANALYZED, OPTIMIZED, MAPPED, EXECUTABLE, ALL
|
||||
PARSED, ANALYZED, OPTIMIZED, MAPPED, EXECUTABLE, ALL;
|
||||
|
||||
public String printableName() {
|
||||
return Strings.capitalize(name().toLowerCase(Locale.ROOT));
|
||||
}
|
||||
}
|
||||
|
||||
public enum Format {
|
||||
@ -74,99 +80,135 @@ public class Explain extends Command {
|
||||
|
||||
@Override
|
||||
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
|
||||
String planString = null;
|
||||
String planName = "Parsed";
|
||||
|
||||
if (type == Type.ALL) {
|
||||
LogicalPlan analyzedPlan = session.analyzedPlan(plan, verify);
|
||||
LogicalPlan optimizedPlan = null;
|
||||
PhysicalPlan mappedPlan = null, executionPlan = null;
|
||||
|
||||
if (type == Type.PARSED) {
|
||||
listener.onResponse(Rows.singleton(output(), formatPlan(format, plan)));
|
||||
return;
|
||||
}
|
||||
|
||||
// to avoid duplicating code, the type/verification filtering happens inside the listeners instead of outside using a CASE
|
||||
session.analyzedPlan(plan, verify, wrap(analyzedPlan -> {
|
||||
|
||||
if (type == Type.ANALYZED) {
|
||||
listener.onResponse(Rows.singleton(output(), formatPlan(format, analyzedPlan)));
|
||||
return;
|
||||
}
|
||||
|
||||
Planner planner = session.planner();
|
||||
|
||||
// verification is on, exceptions can be thrown
|
||||
if (verify) {
|
||||
optimizedPlan = session.optimizedPlan(plan);
|
||||
mappedPlan = planner.mapPlan(optimizedPlan, verify);
|
||||
executionPlan = planner.foldPlan(mappedPlan, verify);
|
||||
session.optimizedPlan(analyzedPlan, wrap(optimizedPlan -> {
|
||||
if (type == Type.OPTIMIZED) {
|
||||
listener.onResponse(Rows.singleton(output(), formatPlan(format, optimizedPlan)));
|
||||
return;
|
||||
}
|
||||
|
||||
PhysicalPlan mappedPlan = planner.mapPlan(optimizedPlan, verify);
|
||||
if (type == Type.MAPPED) {
|
||||
listener.onResponse(Rows.singleton(output(), formatPlan(format, mappedPlan)));
|
||||
return;
|
||||
}
|
||||
|
||||
PhysicalPlan executablePlan = planner.foldPlan(mappedPlan, verify);
|
||||
if (type == Type.EXECUTABLE) {
|
||||
listener.onResponse(Rows.singleton(output(), formatPlan(format, executablePlan)));
|
||||
return;
|
||||
}
|
||||
|
||||
// Type.All
|
||||
listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, mappedPlan, executablePlan)));
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
// check errors manually to see how far the plans work out
|
||||
else {
|
||||
// no analysis failure, can move on
|
||||
if (session.analyzer().verifyFailures(analyzedPlan).isEmpty()) {
|
||||
optimizedPlan = session.optimizedPlan(analyzedPlan);
|
||||
if (optimizedPlan != null) {
|
||||
mappedPlan = planner.mapPlan(optimizedPlan, verify);
|
||||
if (planner.verifyMappingPlanFailures(mappedPlan).isEmpty()) {
|
||||
executionPlan = planner.foldPlan(mappedPlan, verify);
|
||||
session.optimizedPlan(analyzedPlan, wrap(optimizedPlan -> {
|
||||
if (type == Type.OPTIMIZED) {
|
||||
listener.onResponse(Rows.singleton(output(), formatPlan(format, optimizedPlan)));
|
||||
return;
|
||||
}
|
||||
|
||||
PhysicalPlan mappedPlan = planner.mapPlan(optimizedPlan, verify);
|
||||
|
||||
if (type == Type.MAPPED) {
|
||||
listener.onResponse(Rows.singleton(output(), formatPlan(format, mappedPlan)));
|
||||
return;
|
||||
}
|
||||
|
||||
if (planner.verifyMappingPlanFailures(mappedPlan).isEmpty()) {
|
||||
PhysicalPlan executablePlan = planner.foldPlan(mappedPlan, verify);
|
||||
|
||||
if (type == Type.EXECUTABLE) {
|
||||
listener.onResponse(Rows.singleton(output(), formatPlan(format, executablePlan)));
|
||||
return;
|
||||
}
|
||||
|
||||
listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, mappedPlan, executablePlan)));
|
||||
return;
|
||||
}
|
||||
// mapped failed
|
||||
if (type != Type.ALL) {
|
||||
listener.onResponse(Rows.singleton(output(), formatPlan(format, mappedPlan)));
|
||||
return;
|
||||
}
|
||||
|
||||
listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, mappedPlan, null)));
|
||||
}, listener::onFailure));
|
||||
// cannot continue
|
||||
} else {
|
||||
if (type != Type.ALL) {
|
||||
listener.onResponse(Rows.singleton(output(), formatPlan(format, analyzedPlan)));
|
||||
}
|
||||
else {
|
||||
listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, null, null, null)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
if (format == Format.TEXT) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Parsed\n");
|
||||
sb.append("-----------\n");
|
||||
sb.append(plan.toString());
|
||||
sb.append("\nAnalyzed\n");
|
||||
sb.append("--------\n");
|
||||
sb.append(analyzedPlan.toString());
|
||||
sb.append("\nOptimized\n");
|
||||
sb.append("---------\n");
|
||||
sb.append(optimizedPlan.toString());
|
||||
sb.append("\nMapped\n");
|
||||
sb.append("---------\n");
|
||||
sb.append(mappedPlan.toString());
|
||||
sb.append("\nExecutable\n");
|
||||
sb.append("---------\n");
|
||||
sb.append(executionPlan.toString());
|
||||
private static String printPlans(Format format, LogicalPlan parsed, LogicalPlan analyzedPlan, LogicalPlan optimizedPlan, PhysicalPlan mappedPlan, PhysicalPlan executionPlan) {
|
||||
if (format == Format.TEXT) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Parsed\n");
|
||||
sb.append("-----------\n");
|
||||
sb.append(parsed.toString());
|
||||
sb.append("\nAnalyzed\n");
|
||||
sb.append("--------\n");
|
||||
sb.append(analyzedPlan.toString());
|
||||
sb.append("\nOptimized\n");
|
||||
sb.append("---------\n");
|
||||
sb.append(nullablePlan(optimizedPlan));
|
||||
sb.append("\nMapped\n");
|
||||
sb.append("---------\n");
|
||||
sb.append(nullablePlan(mappedPlan));
|
||||
sb.append("\nExecutable\n");
|
||||
sb.append("---------\n");
|
||||
sb.append(nullablePlan(executionPlan));
|
||||
|
||||
planString = sb.toString();
|
||||
} else {
|
||||
Map<String, QueryPlan<?>> plans = new HashMap<>();
|
||||
plans.put("Parsed", plan);
|
||||
plans.put("Analyzed", analyzedPlan);
|
||||
return sb.toString();
|
||||
} else {
|
||||
Map<String, QueryPlan<?>> plans = new HashMap<>();
|
||||
plans.put("Parsed", parsed);
|
||||
plans.put("Analyzed", analyzedPlan);
|
||||
|
||||
if (optimizedPlan != null) {
|
||||
plans.put("Optimized", optimizedPlan);
|
||||
plans.put("Mapped", mappedPlan);
|
||||
plans.put("Execution", executionPlan);
|
||||
planString = Graphviz.dot(unmodifiableMap(plans), false);
|
||||
}
|
||||
return Graphviz.dot(unmodifiableMap(plans), false);
|
||||
}
|
||||
}
|
||||
|
||||
else {
|
||||
QueryPlan<?> queryPlan = null;
|
||||
private static String nullablePlan(QueryPlan<?> plan) {
|
||||
return plan != null ? plan.toString() : "<not computed due to failures>";
|
||||
}
|
||||
|
||||
switch (type) {
|
||||
case PARSED:
|
||||
queryPlan = plan;
|
||||
planName = "Parsed";
|
||||
break;
|
||||
case ANALYZED:
|
||||
queryPlan = session.analyzedPlan(plan, verify);
|
||||
planName = "Analyzed";
|
||||
break;
|
||||
case OPTIMIZED:
|
||||
queryPlan = session.optimizedPlan(session.analyzedPlan(plan, verify));
|
||||
planName = "Optimized";
|
||||
break;
|
||||
case MAPPED:
|
||||
queryPlan = session.planner().mapPlan(session.optimizedPlan(session.analyzedPlan(plan, verify)), verify);
|
||||
planName = "Mapped";
|
||||
break;
|
||||
case EXECUTABLE:
|
||||
queryPlan = session.planner().foldPlan(session.planner().mapPlan(session.optimizedPlan(session.analyzedPlan(plan, verify)), verify), verify);
|
||||
planName = "Executable";
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
planString = (format == Format.TEXT ? queryPlan.toString() : Graphviz.dot(planName, queryPlan));
|
||||
}
|
||||
|
||||
listener.onResponse(Rows.singleton(output(), planString));
|
||||
private String formatPlan(Format format, QueryPlan<?> plan) {
|
||||
return (format == Format.TEXT ? nullablePlan(plan) : Graphviz.dot(type.printableName(), plan));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -188,4 +230,4 @@ public class Explain extends Command {
|
||||
&& Objects.equals(type, o.type)
|
||||
&& Objects.equals(plan, o.plan);
|
||||
}
|
||||
}
|
||||
}
|
@ -6,7 +6,7 @@
|
||||
package org.elasticsearch.xpack.sql.plan.logical.command;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog.GetIndexResult;
|
||||
import org.elasticsearch.xpack.sql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
|
||||
import org.elasticsearch.xpack.sql.session.Rows;
|
||||
@ -24,6 +24,7 @@ import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
public class ShowColumns extends Command {
|
||||
|
||||
@ -46,12 +47,13 @@ public class ShowColumns extends Command {
|
||||
|
||||
@Override
|
||||
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
|
||||
session.getIndices(new String[]{index}, IndicesOptions.strictExpandOpenAndForbidClosed(), ActionListener.wrap(
|
||||
esIndices -> {
|
||||
List<List<?>> rows = new ArrayList<>();
|
||||
if (esIndices.isEmpty() == false) {
|
||||
//TODO: we are using only the first index for now - add support for aliases
|
||||
fillInRows(esIndices.get(0).mapping(), null, rows);
|
||||
session.indexResolver().asCatalog(index, ActionListener.wrap(
|
||||
c -> {
|
||||
List<List<?>> rows = emptyList();
|
||||
GetIndexResult indexResult = c.getIndex(index);
|
||||
if (indexResult.isValid()) {
|
||||
rows = new ArrayList<>();
|
||||
fillInRows(indexResult.get().mapping(), null, rows);
|
||||
}
|
||||
listener.onResponse(Rows.of(output(), rows));
|
||||
},
|
||||
@ -91,4 +93,4 @@ public class ShowColumns extends Command {
|
||||
ShowColumns other = (ShowColumns) obj;
|
||||
return Objects.equals(index, other.index);
|
||||
}
|
||||
}
|
||||
}
|
@ -6,12 +6,10 @@
|
||||
package org.elasticsearch.xpack.sql.plan.logical.command;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.xpack.sql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
|
||||
import org.elasticsearch.xpack.sql.session.RowSet;
|
||||
import org.elasticsearch.xpack.sql.session.Rows;
|
||||
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
|
||||
import org.elasticsearch.xpack.sql.session.SqlSession;
|
||||
@ -19,10 +17,10 @@ import org.elasticsearch.xpack.sql.tree.Location;
|
||||
import org.elasticsearch.xpack.sql.type.DataTypes;
|
||||
import org.elasticsearch.xpack.sql.util.StringUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
@ -42,17 +40,18 @@ public class ShowTables extends Command {
|
||||
|
||||
@Override
|
||||
public List<Attribute> output() {
|
||||
return asList(new RootFieldAttribute(location(), "table", DataTypes.KEYWORD));
|
||||
return Arrays.asList(new RootFieldAttribute(location(), "table", DataTypes.KEYWORD));
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
|
||||
String pattern = Strings.hasText(this.pattern) ? StringUtils.jdbcToEsPattern(this.pattern) : "*";
|
||||
session.getIndices(new String[] {pattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> {
|
||||
|
||||
session.indexResolver().asList(ActionListener.wrap(result -> {
|
||||
listener.onResponse(Rows.of(output(), result.stream()
|
||||
.map(t -> singletonList(t.name()))
|
||||
.collect(toList())));
|
||||
}, listener::onFailure));
|
||||
}, listener::onFailure), pattern);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -71,4 +71,4 @@ public abstract class AbstractSqlProtocolRestAction extends BaseRestHandler {
|
||||
}
|
||||
return innerPrepareRequest(request, client);
|
||||
}
|
||||
}
|
||||
}
|
@ -20,6 +20,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.IndexResolver;
|
||||
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ColumnInfo;
|
||||
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoResponse;
|
||||
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnInfo;
|
||||
@ -57,11 +58,14 @@ import static org.elasticsearch.xpack.sql.util.StringUtils.EMPTY;
|
||||
|
||||
public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
|
||||
private final SqlLicenseChecker sqlLicenseChecker;
|
||||
private final IndexResolver indexResolver;
|
||||
|
||||
public RestSqlJdbcAction(Settings settings, RestController controller, SqlLicenseChecker sqlLicenseChecker) {
|
||||
public RestSqlJdbcAction(Settings settings, RestController controller, SqlLicenseChecker sqlLicenseChecker,
|
||||
IndexResolver indexResolver) {
|
||||
super(settings, Proto.INSTANCE);
|
||||
controller.registerHandler(POST, "/_sql/jdbc", this);
|
||||
this.sqlLicenseChecker = sqlLicenseChecker;
|
||||
this.indexResolver = indexResolver;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -102,38 +106,60 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
|
||||
}
|
||||
|
||||
private Consumer<RestChannel> metaTable(Client client, MetaTableRequest request) {
|
||||
//TODO once mappings are filtered this can go directly to the IndexResolver (code commented out below)
|
||||
String indexPattern = hasText(request.pattern()) ? StringUtils.jdbcToEsPattern(request.pattern()) : "*";
|
||||
SqlGetIndicesAction.Request getRequest = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), indexPattern);
|
||||
getRequest.local(true); // TODO serialization not supported by get indices action
|
||||
return channel -> client.execute(SqlGetIndicesAction.INSTANCE, getRequest, toActionListener(channel, response -> {
|
||||
return new MetaTableResponse(response.indices().stream()
|
||||
.map(EsIndex::name)
|
||||
.collect(toList()));
|
||||
}));
|
||||
getRequest.local(true);
|
||||
return channel -> client.execute(SqlGetIndicesAction.INSTANCE, getRequest, toActionListener(channel,
|
||||
response -> new MetaTableResponse(response.indices().stream().map(EsIndex::name).collect(toList()))));
|
||||
/*String indexPattern = hasText(request.pattern()) ? StringUtils.jdbcToEsPattern(request.pattern()) : "*";
|
||||
return channel -> indexResolver.asList(indexPattern, toActionListener(request, channel, list ->
|
||||
new MetaTableResponse(list.stream()
|
||||
.map(EsIndex::name)
|
||||
.collect(toList()))));*/
|
||||
}
|
||||
|
||||
private Consumer<RestChannel> metaColumn(Client client, MetaColumnRequest request) {
|
||||
//TODO once mappings are filtered this can go directly to the IndexResolver (code commented out below)
|
||||
String indexPattern = Strings.hasText(request.tablePattern()) ? StringUtils.jdbcToEsPattern(request.tablePattern()) : "*";
|
||||
Pattern columnMatcher = hasText(request.columnPattern()) ? StringUtils.likeRegex(request.columnPattern()) : null;
|
||||
|
||||
SqlGetIndicesAction.Request getRequest = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), indexPattern);
|
||||
getRequest.local(true); // TODO serialization not supported by get indices action
|
||||
getRequest.local(true);
|
||||
return channel -> client.execute(SqlGetIndicesAction.INSTANCE, getRequest, toActionListener(channel, response -> {
|
||||
List<MetaColumnInfo> columns = new ArrayList<>();
|
||||
for (EsIndex esIndex : response.indices()) {
|
||||
int pos = 0;
|
||||
for (Map.Entry<String, DataType> entry : esIndex.mapping().entrySet()) {
|
||||
pos++;
|
||||
String name = entry.getKey();
|
||||
if (columnMatcher == null || columnMatcher.matcher(name).matches()) {
|
||||
DataType type = entry.getValue();
|
||||
// the column size it's actually its precision (based on the Javadocs)
|
||||
columns.add(new MetaColumnInfo(esIndex.name(), name, type.sqlType(), type.precision(), pos));
|
||||
}
|
||||
}
|
||||
int pos = 0;
|
||||
for (Map.Entry<String, DataType> entry : esIndex.mapping().entrySet()) {
|
||||
pos++;
|
||||
String name = entry.getKey();
|
||||
if (columnMatcher == null || columnMatcher.matcher(name).matches()) {
|
||||
DataType type = entry.getValue();
|
||||
// the column size it's actually its precision (based on the Javadocs)
|
||||
columns.add(new MetaColumnInfo(esIndex.name(), name, type.sqlType(), type.precision(), pos));
|
||||
}
|
||||
}
|
||||
}
|
||||
return new MetaColumnResponse(columns);
|
||||
}));
|
||||
/*String indexPattern = Strings.hasText(request.tablePattern()) ? StringUtils.jdbcToEsPattern(request.tablePattern()) : "*";
|
||||
Pattern columnMatcher = hasText(request.columnPattern()) ? StringUtils.likeRegex(request.columnPattern()) : null;
|
||||
return channel -> indexResolver.asList(indexPattern, toActionListener(request, channel, esIndices -> {
|
||||
List<MetaColumnInfo> columns = new ArrayList<>();
|
||||
for (EsIndex esIndex : esIndices) {
|
||||
int pos = 0;
|
||||
for (Map.Entry<String, DataType> entry : esIndex.mapping().entrySet()) {
|
||||
pos++;
|
||||
String name = entry.getKey();
|
||||
if (columnMatcher == null || columnMatcher.matcher(name).matches()) {
|
||||
DataType type = entry.getValue();
|
||||
// the column size it's actually its precision (based on the Javadocs)
|
||||
columns.add(new MetaColumnInfo(esIndex.name(), name, type.sqlType(), type.precision(), pos));
|
||||
}
|
||||
}
|
||||
}
|
||||
return new MetaColumnResponse(columns);
|
||||
}));*/
|
||||
}
|
||||
|
||||
private Consumer<RestChannel> queryInit(Client client, QueryInitRequest request) {
|
||||
@ -170,10 +196,9 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
|
||||
TimeValue.timeValueMillis(request.timeout.pageTimeout),
|
||||
cursor);
|
||||
long start = System.nanoTime();
|
||||
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel, response -> {
|
||||
return new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types),
|
||||
new SqlResponsePayload(types, response.rows()));
|
||||
}));
|
||||
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, toActionListener(channel,
|
||||
response -> new QueryPageResponse(System.nanoTime() - start, serializeCursor(response.cursor(), types),
|
||||
new SqlResponsePayload(types, response.rows()))));
|
||||
}
|
||||
|
||||
private static byte[] serializeCursor(Cursor cursor, List<JDBCType> types) {
|
||||
|
@ -27,19 +27,13 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog.GetIndexResult;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.IndexResolver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static java.util.Comparator.comparing;
|
||||
|
||||
public class SqlGetIndicesAction
|
||||
extends Action<SqlGetIndicesAction.Request, SqlGetIndicesAction.Response, SqlGetIndicesAction.RequestBuilder> {
|
||||
@ -181,17 +175,18 @@ public class SqlGetIndicesAction
|
||||
}
|
||||
|
||||
public static class TransportAction extends TransportMasterNodeReadAction<Request, Response> {
|
||||
private final Function<ClusterState, Catalog> catalogSupplier;
|
||||
private final IndexResolver indexResolver;
|
||||
private final SqlLicenseChecker licenseChecker;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService,
|
||||
ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, CatalogHolder catalog, SqlLicenseChecker licenseChecker) {
|
||||
ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, SqlLicenseChecker licenseChecker,
|
||||
IndexResolver indexResolver) {
|
||||
super(settings, NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
Request::new, indexNameExpressionResolver);
|
||||
this.catalogSupplier = catalog.catalogSupplier;
|
||||
this.licenseChecker = licenseChecker;
|
||||
this.indexResolver = indexResolver;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -208,7 +203,7 @@ public class SqlGetIndicesAction
|
||||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) {
|
||||
licenseChecker.checkIfSqlAllowed();
|
||||
operation(indexNameExpressionResolver, catalogSupplier, request, state, listener);
|
||||
operation(indexResolver, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -216,43 +211,10 @@ public class SqlGetIndicesAction
|
||||
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ,
|
||||
indexNameExpressionResolver.concreteIndexNames(state, request));
|
||||
}
|
||||
|
||||
/**
|
||||
* Class that holds that {@link Catalog} to aid in guice binding.
|
||||
*/
|
||||
public static class CatalogHolder {
|
||||
final Function<ClusterState, Catalog> catalogSupplier;
|
||||
|
||||
public CatalogHolder(Function<ClusterState, Catalog> catalogSupplier) {
|
||||
this.catalogSupplier = catalogSupplier;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Actually looks up the indices in the cluster state and converts
|
||||
* them into {@link EsIndex} instances. The rest of the contents of
|
||||
* this class integrates this behavior cleanly into Elasticsearch,
|
||||
* makes sure that we only try and read the cluster state when it is
|
||||
* ready, integrate with security to filter the requested indices to
|
||||
* what the user has permission to access, and leaves an appropriate
|
||||
* audit trail.
|
||||
*/
|
||||
public static void operation(IndexNameExpressionResolver indexNameExpressionResolver, Function<ClusterState, Catalog> catalogSupplier,
|
||||
Request request, ClusterState state, ActionListener<Response> listener) {
|
||||
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request);
|
||||
List<EsIndex> results = new ArrayList<>(concreteIndices.length);
|
||||
Catalog catalog = catalogSupplier.apply(state);
|
||||
for (String index : concreteIndices) {
|
||||
GetIndexResult result = catalog.getIndex(index);
|
||||
if (result.isValid()) {
|
||||
results.add(result.get());
|
||||
}
|
||||
}
|
||||
|
||||
// Consistent sorting is better for testing and for humans
|
||||
Collections.sort(results, comparing(EsIndex::name));
|
||||
|
||||
listener.onResponse(new Response(results));
|
||||
static void operation(IndexResolver indexResolver, Request request, ActionListener<Response> listener) {
|
||||
indexResolver.asList(ActionListener.wrap(results -> listener.onResponse(new Response(results)), listener::onFailure),
|
||||
request.indices);
|
||||
}
|
||||
}
|
||||
}
|
@ -8,10 +8,8 @@ package org.elasticsearch.xpack.sql.plugin;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
@ -21,11 +19,9 @@ import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestHandler;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.FilteredCatalog;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.IndexResolver;
|
||||
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
|
||||
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction.TransportAction.CatalogHolder;
|
||||
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
|
||||
import org.elasticsearch.xpack.sql.plugin.sql.action.TransportSqlAction;
|
||||
import org.elasticsearch.xpack.sql.plugin.sql.rest.RestSqlAction;
|
||||
@ -34,55 +30,53 @@ import org.elasticsearch.xpack.sql.session.Cursor;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
public class SqlPlugin implements ActionPlugin {
|
||||
|
||||
public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
|
||||
return Cursor.getNamedWriteables();
|
||||
}
|
||||
|
||||
private final SqlLicenseChecker sqlLicenseChecker;
|
||||
|
||||
private final boolean enabled;
|
||||
private final SqlLicenseChecker sqlLicenseChecker;
|
||||
private IndexResolver indexResolver;
|
||||
|
||||
public SqlPlugin(boolean enabled, SqlLicenseChecker sqlLicenseChecker) {
|
||||
this.sqlLicenseChecker = sqlLicenseChecker;
|
||||
this.enabled = enabled;
|
||||
this.sqlLicenseChecker = sqlLicenseChecker;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create components used by the sql plugin.
|
||||
* @param catalogFilter if non-null it is a filter for the catalog to apply security
|
||||
*/
|
||||
public Collection<Object> createComponents(Client client, ClusterService clusterService,
|
||||
@Nullable FilteredCatalog.Filter catalogFilter) {
|
||||
public Collection<Object> createComponents(Client client, @Nullable FilteredCatalog.Filter catalogFilter) {
|
||||
if (false == enabled) {
|
||||
return emptyList();
|
||||
}
|
||||
Function<ClusterState, Catalog> catalog = EsCatalog::new;
|
||||
if (catalogFilter != null) {
|
||||
catalog = catalog.andThen(c -> new FilteredCatalog(c, catalogFilter));
|
||||
}
|
||||
indexResolver = new IndexResolver(client, catalogFilter);
|
||||
return Arrays.asList(
|
||||
new CatalogHolder(catalog),
|
||||
sqlLicenseChecker,
|
||||
new PlanExecutor(client, clusterService::state, catalog));
|
||||
indexResolver,
|
||||
new PlanExecutor(client, indexResolver));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RestHandler> getRestHandlers(Settings settings, RestController restController,
|
||||
ClusterSettings clusterSettings, IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<DiscoveryNodes> nodesInCluster) {
|
||||
|
||||
if (false == enabled) {
|
||||
return emptyList();
|
||||
}
|
||||
|
||||
return Arrays.asList(new RestSqlAction(settings, restController),
|
||||
new SqlTranslateAction.RestAction(settings, restController),
|
||||
new RestSqlCliAction(settings, restController),
|
||||
new RestSqlJdbcAction(settings, restController, sqlLicenseChecker));
|
||||
new RestSqlJdbcAction(settings, restController, sqlLicenseChecker, indexResolver));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -90,6 +84,7 @@ public class SqlPlugin implements ActionPlugin {
|
||||
if (false == enabled) {
|
||||
return emptyList();
|
||||
}
|
||||
|
||||
return Arrays.asList(new ActionHandler<>(SqlAction.INSTANCE, TransportSqlAction.class),
|
||||
new ActionHandler<>(SqlGetIndicesAction.INSTANCE, SqlGetIndicesAction.TransportAction.class),
|
||||
new ActionHandler<>(SqlTranslateAction.INSTANCE, SqlTranslateAction.TransportAction.class));
|
||||
|
@ -189,7 +189,8 @@ public class SqlTranslateAction
|
||||
Configuration cfg = new Configuration(request.timeZone(), request.fetchSize(),
|
||||
request.requestTimeout(), request.pageTimeout(), request.filter());
|
||||
|
||||
listener.onResponse(new Response(planExecutor.searchSource(query, cfg)));
|
||||
planExecutor.searchSource(query, cfg, ActionListener.wrap(
|
||||
searchSourceBuilder -> listener.onResponse(new Response(searchSourceBuilder)), listener::onFailure));
|
||||
}
|
||||
}
|
||||
|
||||
@ -206,9 +207,7 @@ public class SqlTranslateAction
|
||||
try (XContentParser parser = request.contentOrSourceParamParser()) {
|
||||
sqlRequest = Request.PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
return channel -> client.executeLocally(SqlTranslateAction.INSTANCE,
|
||||
sqlRequest, new RestToXContentListener<Response>(channel));
|
||||
return channel -> client.executeLocally(SqlTranslateAction.INSTANCE, sqlRequest, new RestToXContentListener<>(channel));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -49,9 +49,10 @@ public class RestSqlAction extends BaseRestHandler {
|
||||
if (xContentType != null) {
|
||||
// The client expects us to send back results in a XContent format
|
||||
return channel -> client.executeLocally(SqlAction.INSTANCE, sqlRequest, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
||||
// The client accepts plain text
|
||||
long startNanos = System.nanoTime();
|
||||
|
||||
return channel -> client.execute(SqlAction.INSTANCE, sqlRequest, new RestResponseListener<SqlResponse>(channel) {
|
||||
@Override
|
||||
public RestResponse buildResponse(SqlResponse response) throws Exception {
|
||||
|
@ -6,11 +6,13 @@
|
||||
package org.elasticsearch.xpack.sql.session;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
|
||||
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
|
||||
import org.elasticsearch.xpack.sql.analysis.analyzer.PreAnalyzer;
|
||||
import org.elasticsearch.xpack.sql.analysis.analyzer.PreAnalyzer.PreAnalysis;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.IndexResolver;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
|
||||
import org.elasticsearch.xpack.sql.optimizer.Optimizer;
|
||||
@ -18,57 +20,84 @@ import org.elasticsearch.xpack.sql.parser.SqlParser;
|
||||
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
|
||||
import org.elasticsearch.xpack.sql.planner.Planner;
|
||||
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction;
|
||||
import org.elasticsearch.xpack.sql.rule.RuleExecutor;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.action.ActionListener.wrap;
|
||||
|
||||
public class SqlSession {
|
||||
|
||||
private final Client client;
|
||||
private final Catalog catalog;
|
||||
|
||||
private final SqlParser parser;
|
||||
private final FunctionRegistry functionRegistry;
|
||||
private final IndexResolver indexResolver;
|
||||
private final PreAnalyzer preAnalyzer;
|
||||
private final Analyzer analyzer;
|
||||
private final Optimizer optimizer;
|
||||
private final Planner planner;
|
||||
private final Analyzer analyzer;
|
||||
|
||||
private Configuration settings;
|
||||
|
||||
public static class SessionContext {
|
||||
|
||||
public final Configuration configuration;
|
||||
public final Catalog catalog;
|
||||
|
||||
SessionContext(Configuration configuration, Catalog catalog) {
|
||||
this.configuration = configuration;
|
||||
this.catalog = catalog;
|
||||
}
|
||||
}
|
||||
|
||||
// thread-local used for sharing settings across the plan compilation
|
||||
// Currently this is used during:
|
||||
// 1. parsing - to set the TZ in date time functions (if they are used)
|
||||
// 2. analysis - to compute the Catalog and share it across the rules
|
||||
// Might be used in
|
||||
// 3. Optimization - to pass in configs around plan hints/settings
|
||||
// 4. Folding/mapping - same as above
|
||||
|
||||
// TODO investigate removing
|
||||
public static final ThreadLocal<Configuration> CURRENT_SETTINGS = new ThreadLocal<Configuration>() {
|
||||
static final ThreadLocal<SessionContext> CURRENT_CONTEXT = new ThreadLocal<SessionContext>() {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SQL Session";
|
||||
return "SQL SessionContext";
|
||||
}
|
||||
};
|
||||
|
||||
public SqlSession(SqlSession other) {
|
||||
this(other.settings(), other.client(), other.catalog(), other.functionRegistry(),
|
||||
other.parser, other.optimizer(), other.planner());
|
||||
this(other.settings, other.client, other.functionRegistry, other.parser, other.indexResolver,
|
||||
other.preAnalyzer, other.analyzer, other.optimizer,other.planner);
|
||||
}
|
||||
|
||||
public SqlSession(Configuration defaults, Client client,
|
||||
Catalog catalog, FunctionRegistry functionRegistry,
|
||||
public SqlSession(Configuration settings, Client client, FunctionRegistry functionRegistry,
|
||||
SqlParser parser,
|
||||
IndexResolver indexResolver,
|
||||
PreAnalyzer preAnalyzer,
|
||||
Analyzer analyzer,
|
||||
Optimizer optimizer,
|
||||
Planner planner) {
|
||||
this.client = client;
|
||||
this.catalog = catalog;
|
||||
this.functionRegistry = functionRegistry;
|
||||
|
||||
this.parser = parser;
|
||||
this.functionRegistry = functionRegistry;
|
||||
//TODO: analyzer should really be a singleton
|
||||
this.analyzer = new Analyzer(functionRegistry, catalog);
|
||||
this.indexResolver = indexResolver;
|
||||
this.preAnalyzer = preAnalyzer;
|
||||
this.analyzer = analyzer;
|
||||
this.optimizer = optimizer;
|
||||
this.planner = planner;
|
||||
|
||||
this.settings = defaults;
|
||||
this.settings = settings;
|
||||
}
|
||||
|
||||
public Catalog catalog() {
|
||||
return catalog;
|
||||
public static SessionContext currentContext() {
|
||||
SessionContext ctx = CURRENT_CONTEXT.get();
|
||||
if (ctx == null) {
|
||||
throw new SqlIllegalArgumentException("Context is accessible only during the session");
|
||||
}
|
||||
return ctx;
|
||||
}
|
||||
|
||||
public FunctionRegistry functionRegistry() {
|
||||
@ -79,20 +108,14 @@ public class SqlSession {
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the indices matching a pattern. Prefer this method if possible.
|
||||
*/
|
||||
public void getIndices(String[] patterns, IndicesOptions options, ActionListener<List<EsIndex>> listener) {
|
||||
SqlGetIndicesAction.Request request = new SqlGetIndicesAction.Request(options, patterns).local(true);
|
||||
client.execute(SqlGetIndicesAction.INSTANCE, request, ActionListener.wrap(response -> {
|
||||
listener.onResponse(response.indices());
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
public Planner planner() {
|
||||
return planner;
|
||||
}
|
||||
|
||||
public IndexResolver indexResolver() {
|
||||
return indexResolver;
|
||||
}
|
||||
|
||||
public Analyzer analyzer() {
|
||||
return analyzer;
|
||||
}
|
||||
@ -101,45 +124,91 @@ public class SqlSession {
|
||||
return optimizer;
|
||||
}
|
||||
|
||||
public LogicalPlan parse(String sql) {
|
||||
return parser.createStatement(sql);
|
||||
}
|
||||
|
||||
public Expression expression(String expression) {
|
||||
return parser.createExpression(expression);
|
||||
}
|
||||
|
||||
public LogicalPlan analyzedPlan(LogicalPlan plan, boolean verify) {
|
||||
Analyzer analyzer = analyzer();
|
||||
return verify ? analyzer.verify(analyzer.analyze(plan)) : analyzer.analyze(plan);
|
||||
}
|
||||
|
||||
public LogicalPlan optimizedPlan(LogicalPlan verified) {
|
||||
return optimizer.optimize(analyzedPlan(verified, true));
|
||||
}
|
||||
|
||||
public PhysicalPlan physicalPlan(LogicalPlan optimized, boolean verify) {
|
||||
return planner.plan(optimizedPlan(optimized), verify);
|
||||
}
|
||||
|
||||
public PhysicalPlan executable(String sql) {
|
||||
CURRENT_SETTINGS.set(settings);
|
||||
private LogicalPlan doParse(String sql) {
|
||||
try {
|
||||
return physicalPlan(parse(sql), true);
|
||||
// NB: it's okay for the catalog to be empty - parsing only cares about the configuration
|
||||
CURRENT_CONTEXT.set(new SessionContext(settings, Catalog.EMPTY));
|
||||
return parser.createStatement(sql);
|
||||
} finally {
|
||||
CURRENT_SETTINGS.remove();
|
||||
CURRENT_CONTEXT.remove();
|
||||
}
|
||||
}
|
||||
|
||||
public void analyzedPlan(LogicalPlan parsed, boolean verify, ActionListener<LogicalPlan> listener) {
|
||||
if (parsed.analyzed()) {
|
||||
listener.onResponse(parsed);
|
||||
return;
|
||||
}
|
||||
|
||||
preAnalyze(parsed, c -> {
|
||||
try {
|
||||
CURRENT_CONTEXT.set(new SessionContext(settings, c));
|
||||
return verify ? analyzer.verify(analyzer.analyze(parsed)) : analyzer.analyze(parsed);
|
||||
} finally {
|
||||
CURRENT_CONTEXT.remove();
|
||||
}
|
||||
}, listener);
|
||||
}
|
||||
|
||||
public void debugAnalyzedPlan(LogicalPlan parsed, ActionListener<RuleExecutor<LogicalPlan>.ExecutionInfo> listener) {
|
||||
if (parsed.analyzed()) {
|
||||
listener.onResponse(null);
|
||||
return;
|
||||
}
|
||||
|
||||
preAnalyze(parsed, c -> {
|
||||
try {
|
||||
CURRENT_CONTEXT.set(new SessionContext(settings, c));
|
||||
return analyzer.debugAnalyze(parsed);
|
||||
} finally {
|
||||
CURRENT_CONTEXT.remove();
|
||||
}
|
||||
}, listener);
|
||||
}
|
||||
|
||||
private <T> void preAnalyze(LogicalPlan parsed, Function<Catalog, T> action, ActionListener<T> listener) {
|
||||
PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
|
||||
if (preAnalysis.indices.size() > 1) {
|
||||
listener.onFailure(new SqlIllegalArgumentException("Queries with multiple indices are not supported"));
|
||||
return;
|
||||
}
|
||||
if (preAnalysis.indices.size() == 1) {
|
||||
indexResolver.asCatalog(preAnalysis.indices.get(0),
|
||||
wrap(c -> listener.onResponse(action.apply(c)), listener::onFailure));
|
||||
} else {
|
||||
try {
|
||||
listener.onResponse(action.apply(Catalog.EMPTY));
|
||||
} catch (Exception ex) {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void optimizedPlan(LogicalPlan verified, ActionListener<LogicalPlan> listener) {
|
||||
analyzedPlan(verified, true, wrap(v -> listener.onResponse(optimizer.optimize(v)), listener::onFailure));
|
||||
}
|
||||
|
||||
public void physicalPlan(LogicalPlan optimized, boolean verify, ActionListener<PhysicalPlan> listener) {
|
||||
optimizedPlan(optimized, wrap(o -> listener.onResponse(planner.plan(o, verify)), listener::onFailure));
|
||||
}
|
||||
|
||||
public void sql(String sql, ActionListener<SchemaRowSet> listener) {
|
||||
executable(sql).execute(this, listener);
|
||||
sqlExecutable(sql, wrap(e -> e.execute(this, listener), listener::onFailure));
|
||||
}
|
||||
|
||||
public void sqlExecutable(String sql, ActionListener<PhysicalPlan> listener) {
|
||||
try {
|
||||
physicalPlan(doParse(sql), true, listener);
|
||||
} catch (Exception ex) {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public Configuration settings() {
|
||||
return settings;
|
||||
}
|
||||
|
||||
public void execute(PhysicalPlan plan, ActionListener<SchemaRowSet> listener) {
|
||||
plan.execute(this, listener);
|
||||
}
|
||||
}
|
@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.analysis.analyzer;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.analysis.analyzer.PreAnalyzer.PreAnalysis;
|
||||
import org.elasticsearch.xpack.sql.parser.SqlParser;
|
||||
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
|
||||
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class PreAnalyzerTests extends ESTestCase {
|
||||
|
||||
private SqlParser parser = new SqlParser();
|
||||
private PreAnalyzer preAnalyzer = new PreAnalyzer();
|
||||
|
||||
public void testBasicIndex() {
|
||||
LogicalPlan plan = parser.createStatement("SELECT * FROM index");
|
||||
PreAnalysis result = preAnalyzer.preAnalyze(plan);
|
||||
assertThat(plan.preAnalyzed(), is(true));
|
||||
assertThat(result.indices, contains("index"));
|
||||
}
|
||||
|
||||
public void testQuotedIndex() {
|
||||
LogicalPlan plan = parser.createStatement("SELECT * FROM \"aaa\"");
|
||||
PreAnalysis result = preAnalyzer.preAnalyze(plan);
|
||||
assertThat(plan.preAnalyzed(), is(true));
|
||||
assertThat(result.indices, contains("aaa"));
|
||||
}
|
||||
|
||||
public void testComplicatedQuery() {
|
||||
LogicalPlan plan = parser.createStatement("SELECT MAX(a) FROM aaa WHERE d > 10 GROUP BY b HAVING AVG(c) ORDER BY e ASC");
|
||||
PreAnalysis result = preAnalyzer.preAnalyze(plan);
|
||||
assertThat(plan.preAnalyzed(), is(true));
|
||||
assertThat(result.indices, contains("aaa"));
|
||||
}
|
||||
}
|
@ -5,7 +5,6 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.analysis.analyzer;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.analysis.AnalysisException;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
|
||||
@ -14,13 +13,15 @@ import org.elasticsearch.xpack.sql.analysis.catalog.InMemoryCatalog;
|
||||
import org.elasticsearch.xpack.sql.expression.function.DefaultFunctionRegistry;
|
||||
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
|
||||
import org.elasticsearch.xpack.sql.parser.SqlParser;
|
||||
import org.elasticsearch.xpack.sql.session.TestingSqlSession;
|
||||
import org.elasticsearch.xpack.sql.type.DataType;
|
||||
import org.elasticsearch.xpack.sql.type.DataTypes;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
public class VerifierErrorMessagesTests extends ESTestCase {
|
||||
@ -39,9 +40,19 @@ public class VerifierErrorMessagesTests extends ESTestCase {
|
||||
mapping.put("int", DataTypes.INTEGER);
|
||||
mapping.put("text", DataTypes.TEXT);
|
||||
mapping.put("keyword", DataTypes.KEYWORD);
|
||||
EsIndex test = new EsIndex("test", mapping, emptyList(), Settings.EMPTY);
|
||||
EsIndex test = new EsIndex("test", mapping);
|
||||
catalog = new InMemoryCatalog(singletonList(test));
|
||||
analyzer = new Analyzer(functionRegistry, catalog);
|
||||
analyzer = new Analyzer(functionRegistry);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupContext() {
|
||||
TestingSqlSession.setCurrentContext(TestingSqlSession.ctx(catalog));
|
||||
}
|
||||
|
||||
@After
|
||||
public void disposeContext() {
|
||||
TestingSqlSession.removeCurrentContext();
|
||||
}
|
||||
|
||||
private String verify(String sql) {
|
||||
@ -59,7 +70,7 @@ public class VerifierErrorMessagesTests extends ESTestCase {
|
||||
assertEquals("1:8: Unknown column [xxx]", verify("SELECT xxx FROM test"));
|
||||
}
|
||||
|
||||
public void testMispelledColumn() {
|
||||
public void testMisspelledColumn() {
|
||||
assertEquals("1:8: Unknown column [txt], did you mean [text]?", verify("SELECT txt FROM test"));
|
||||
}
|
||||
|
||||
|
@ -1,110 +0,0 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.analysis.catalog;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog.GetIndexResult;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
||||
public class EsCatalogTests extends ESTestCase {
|
||||
public void testEmpty() {
|
||||
Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT).build());
|
||||
assertEquals(GetIndexResult.notFound("test"), catalog.getIndex("test"));
|
||||
}
|
||||
|
||||
public void testIndexExists() throws IOException {
|
||||
Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(MetaData.builder()
|
||||
.put(index("test")
|
||||
.putMapping("test", "{}")))
|
||||
.build());
|
||||
|
||||
GetIndexResult result = catalog.getIndex("test");
|
||||
assertTrue(result.isValid());
|
||||
assertEquals(emptyMap(), result.get().mapping());
|
||||
}
|
||||
|
||||
public void testIndexAbsent() throws IOException {
|
||||
Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(MetaData.builder()
|
||||
.put(index("test")
|
||||
.putMapping("test", "{}")))
|
||||
.build());
|
||||
|
||||
GetIndexResult result = catalog.getIndex("foo");
|
||||
assertFalse(result.isValid());
|
||||
}
|
||||
|
||||
public void testIndexWithDefaultType() throws IOException {
|
||||
Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(MetaData.builder()
|
||||
.put(index("test")
|
||||
.putMapping("test", "{}")
|
||||
.putMapping("_default_", "{}")))
|
||||
.build());
|
||||
|
||||
GetIndexResult result = catalog.getIndex("test");
|
||||
assertTrue(result.isValid());
|
||||
assertEquals(emptyMap(), result.get().mapping());
|
||||
}
|
||||
|
||||
public void testIndexWithTwoTypes() throws IOException {
|
||||
Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(MetaData.builder()
|
||||
.put(index("test")
|
||||
.putMapping("first_type", "{}")
|
||||
.putMapping("second_type", "{}")))
|
||||
.build());
|
||||
|
||||
GetIndexResult result = catalog.getIndex("test");
|
||||
assertFalse(result.isValid());
|
||||
Exception e = expectThrows(MappingException.class, result::get);
|
||||
assertEquals(e.getMessage(), "[test] contains more than one type [first_type, second_type] so it is incompatible with sql");
|
||||
}
|
||||
|
||||
public void testIndexWithNoTypes() throws IOException {
|
||||
Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(MetaData.builder()
|
||||
.put(index("test")))
|
||||
.build());
|
||||
|
||||
GetIndexResult result = catalog.getIndex("test");
|
||||
assertFalse(result.isValid());
|
||||
Exception e = expectThrows(MappingException.class, result::get);
|
||||
assertEquals(e.getMessage(), "[test] doesn't have any types so it is incompatible with sql");
|
||||
}
|
||||
|
||||
public void testIndexStartsWithDot() throws IOException {
|
||||
Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(MetaData.builder()
|
||||
.put(index(".security")
|
||||
.putMapping("test", "{}"))
|
||||
.build())
|
||||
.build());
|
||||
|
||||
GetIndexResult result = catalog.getIndex(".security");
|
||||
assertFalse(result.isValid());
|
||||
Exception e = expectThrows(MappingException.class, result::get);
|
||||
assertEquals(e.getMessage(), "[.security] starts with [.] so it is considered internal and incompatible with sql");
|
||||
}
|
||||
|
||||
private IndexMetaData.Builder index(String name) throws IOException {
|
||||
return IndexMetaData.builder(name)
|
||||
.settings(Settings.builder()
|
||||
.put("index.version.created", Version.CURRENT)
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 1));
|
||||
}
|
||||
}
|
@ -5,14 +5,12 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.analysis.catalog;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog.GetIndexResult;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
@ -45,7 +43,7 @@ public class FilteredCatalogTests extends ESTestCase {
|
||||
if (index.name().equals("cat")) {
|
||||
return delegateResult;
|
||||
}
|
||||
return GetIndexResult.valid(new EsIndex("rat", index.mapping(), index.aliases(), index.settings()));
|
||||
return GetIndexResult.valid(new EsIndex("rat", index.mapping()));
|
||||
});
|
||||
assertEquals(orig.getIndex("cat"), filtered.getIndex("cat"));
|
||||
assertEquals("rat", filtered.getIndex("dog").get().name());
|
||||
@ -64,7 +62,7 @@ public class FilteredCatalogTests extends ESTestCase {
|
||||
|
||||
private Catalog inMemoryCatalog(String... indexNames) {
|
||||
List<EsIndex> indices = Arrays.stream(indexNames)
|
||||
.map(i -> new EsIndex(i, singletonMap("field", null), emptyList(), Settings.EMPTY))
|
||||
.map(i -> new EsIndex(i, singletonMap("field", null)))
|
||||
.collect(toList());
|
||||
return new InMemoryCatalog(indices);
|
||||
}
|
||||
|
@ -5,98 +5,66 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.plugin;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.IndexResolver;
|
||||
import org.mockito.Matchers;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class SqlGetIndicesActionTests extends ESTestCase {
|
||||
private final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(Settings.EMPTY);
|
||||
private final AtomicBoolean called = new AtomicBoolean(false);
|
||||
private final AtomicReference<Exception> error = new AtomicReference<>();
|
||||
|
||||
public void testOperation() throws IOException {
|
||||
public void testOperation() throws IOException, InterruptedException {
|
||||
SqlGetIndicesAction.Request request = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), "test", "bar", "foo*");
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(MetaData.builder()
|
||||
.put(index("test"))
|
||||
.put(index("foo1"))
|
||||
.put(index("foo2")))
|
||||
.build();
|
||||
|
||||
List<EsIndex> esIndices = new ArrayList<>();
|
||||
esIndices.add(new EsIndex("foo1", Collections.emptyMap()));
|
||||
esIndices.add(new EsIndex("foo2", Collections.emptyMap()));
|
||||
esIndices.add(new EsIndex("test", Collections.emptyMap()));
|
||||
|
||||
final AtomicReference<SqlGetIndicesAction.Response> responseRef = new AtomicReference<>();
|
||||
final AtomicReference<Exception> errorRef = new AtomicReference<>();
|
||||
|
||||
ActionListener<SqlGetIndicesAction.Response> listener = new ActionListener<SqlGetIndicesAction.Response>() {
|
||||
@Override
|
||||
public void onResponse(SqlGetIndicesAction.Response response) {
|
||||
assertThat(response.indices(), hasSize(3));
|
||||
assertEquals("foo1", response.indices().get(0).name());
|
||||
assertEquals("foo2", response.indices().get(1).name());
|
||||
assertEquals("test", response.indices().get(2).name());
|
||||
called.set(true);
|
||||
responseRef.set(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
error.set(e);
|
||||
errorRef.set(e);
|
||||
}
|
||||
};
|
||||
SqlGetIndicesAction.operation(indexNameExpressionResolver, EsCatalog::new, request, clusterState, listener);
|
||||
if (error.get() != null) {
|
||||
throw new AssertionError(error.get());
|
||||
}
|
||||
assertTrue(called.get());
|
||||
}
|
||||
|
||||
public void testMultipleTypes() throws IOException {
|
||||
SqlGetIndicesAction.Request request = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), "foo*");
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(MetaData.builder()
|
||||
.put(index("foo1"))
|
||||
.put(index("foo2").putMapping("test2", "{}")))
|
||||
.build();
|
||||
final AtomicBoolean called = new AtomicBoolean(false);
|
||||
final AtomicReference<Exception> error = new AtomicReference<>();
|
||||
ActionListener<SqlGetIndicesAction.Response> listener = new ActionListener<SqlGetIndicesAction.Response>() {
|
||||
@Override
|
||||
public void onResponse(SqlGetIndicesAction.Response response) {
|
||||
assertThat(response.indices(), hasSize(1));
|
||||
assertEquals("foo1", response.indices().get(0).name());
|
||||
called.set(true);
|
||||
}
|
||||
IndexResolver indexResolver = mock(IndexResolver.class);
|
||||
doAnswer((Answer<Void>) invocationOnMock -> {
|
||||
@SuppressWarnings("unchecked")
|
||||
final ActionListener<List<EsIndex>> actionListener = (ActionListener<List<EsIndex>>)invocationOnMock.getArguments()[0];
|
||||
actionListener.onResponse(esIndices);
|
||||
return null;
|
||||
}).when(indexResolver).asList(any(), Matchers.<String>anyVararg());
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
error.set(e);
|
||||
}
|
||||
};
|
||||
SqlGetIndicesAction.operation(indexNameExpressionResolver, EsCatalog::new, request, clusterState, listener);
|
||||
if (error.get() != null) {
|
||||
throw new AssertionError(error.get());
|
||||
}
|
||||
assertTrue(called.get());
|
||||
}
|
||||
SqlGetIndicesAction.operation(indexResolver, request, listener);
|
||||
|
||||
|
||||
IndexMetaData.Builder index(String name) throws IOException {
|
||||
return IndexMetaData.builder(name)
|
||||
.settings(Settings.builder()
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.put(SETTING_VERSION_CREATED, Version.CURRENT))
|
||||
.putMapping("test", "{}");
|
||||
assertNull(errorRef.get());
|
||||
assertNotNull(responseRef.get());
|
||||
SqlGetIndicesAction.Response response = responseRef.get();
|
||||
assertThat(response.indices(), hasSize(3));
|
||||
assertEquals("foo1", response.indices().get(0).name());
|
||||
assertEquals("foo2", response.indices().get(1).name());
|
||||
assertEquals("test", response.indices().get(2).name());
|
||||
}
|
||||
}
|
||||
|
@ -37,8 +37,8 @@ public class SqlGetIndicesRequestTests extends AbstractWireSerializingTestCase<S
|
||||
Supplier<Request> supplier = randomFrom(
|
||||
() -> {
|
||||
Request mutant = new Request(
|
||||
randomValueOtherThan(request.indicesOptions(), SqlGetIndicesRequestTests::randomIndicesOptions),
|
||||
request.indices());
|
||||
randomValueOtherThan(request.indicesOptions(), SqlGetIndicesRequestTests::randomIndicesOptions),
|
||||
request.indices());
|
||||
mutant.local(request.local());
|
||||
mutant.masterNodeTimeout(request.masterNodeTimeout());
|
||||
mutant.setParentTask(request.getParentTask());
|
||||
@ -46,8 +46,8 @@ public class SqlGetIndicesRequestTests extends AbstractWireSerializingTestCase<S
|
||||
},
|
||||
() -> {
|
||||
Request mutant = new Request(
|
||||
request.indicesOptions(),
|
||||
randomValueOtherThanMany(i -> Arrays.equals(request.indices(), i), SqlGetIndicesRequestTests::randomIndices));
|
||||
request.indicesOptions(),
|
||||
randomValueOtherThanMany(i -> Arrays.equals(request.indices(), i), SqlGetIndicesRequestTests::randomIndices));
|
||||
mutant.local(request.local());
|
||||
mutant.masterNodeTimeout(request.masterNodeTimeout());
|
||||
mutant.setParentTask(request.getParentTask());
|
||||
|
@ -8,7 +8,6 @@ package org.elasticsearch.xpack.sql.plugin;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -26,7 +25,7 @@ public class SqlPluginTests extends ESTestCase {
|
||||
|
||||
public void testSqlDisabled() {
|
||||
SqlPlugin plugin = new SqlPlugin(false, new SqlLicenseChecker(() -> {}, () -> {}));
|
||||
assertThat(plugin.createComponents(mock(Client.class), mock(ClusterService.class), mock(FilteredCatalog.Filter.class)), empty());
|
||||
assertThat(plugin.createComponents(mock(Client.class), mock(FilteredCatalog.Filter.class)), empty());
|
||||
assertThat(plugin.getActions(), empty());
|
||||
assertThat(plugin.getRestHandlers(Settings.EMPTY, mock(RestController.class),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
||||
|
@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.session;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
|
||||
import org.elasticsearch.xpack.sql.session.SqlSession.SessionContext;
|
||||
|
||||
public class TestingSqlSession {
|
||||
|
||||
public static SessionContext ctx(Catalog catalog) {
|
||||
Configuration cfg = new Configuration(ESTestCase.randomDateTimeZone(), ESTestCase.between(1, 100),
|
||||
TimeValue.parseTimeValue(ESTestCase.randomPositiveTimeValue(), "test-random"),
|
||||
TimeValue.parseTimeValue(ESTestCase.randomPositiveTimeValue(), "test-random"), null);
|
||||
return new SessionContext(cfg, catalog);
|
||||
}
|
||||
|
||||
public static void setCurrentContext(SessionContext ctx) {
|
||||
assert SqlSession.CURRENT_CONTEXT.get() == null;
|
||||
SqlSession.CURRENT_CONTEXT.set(ctx);
|
||||
}
|
||||
|
||||
public static void removeCurrentContext() {
|
||||
SqlSession.CURRENT_CONTEXT.remove();
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user