From 3f8bf7ccc8e5b2f39cfe64b09a24153592477d7c Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 8 Sep 2017 10:59:47 -0400 Subject: [PATCH] Integrate sql's metadata with security (elastic/x-pack-elasticsearch#2446) This integrates SQL's metadata calls with security by creating `SqlIndicesAction` and routing all of SQL's metadata calls through it. Since it *does* know up from which indices it is working against it can be an `IndicesRequest.Replaceable` and integrate with the existing security infrastructure for filtering indices. This request is implemented fairly similarly to the `GetIndexAction` with the option to read from the master or from a local copy of cluster state. Currently SQL forces it to run on the local copy because the request doesn't properly support serialization. I'd like to implement that in a followup. Original commit: elastic/x-pack-elasticsearch@15f95128207f1272e7cc71c383ebe7c30efdd89b --- .../elasticsearch/xpack/sql/SqlLicenseIT.java | 2 + .../elasticsearch/xpack/sql/TestUtils.java | 19 +- qa/sql/security/roles.yml | 9 +- .../xpack/sql/security/SqlSecurityIT.java | 231 +++++++++++++++-- .../xpack/sql/cli/CliProtoHandler.java | 4 +- .../sql/jdbc/framework/SqlProtoHandler.java | 4 +- .../xpack/sql/analysis/analyzer/Analyzer.java | 9 +- .../xpack/sql/analysis/catalog/Catalog.java | 13 +- .../xpack/sql/analysis/catalog/EsCatalog.java | 82 ++---- .../sql/analysis/catalog/FilteredCatalog.java | 9 - .../xpack/sql/execution/PlanExecutor.java | 35 ++- .../xpack/sql/optimizer/Optimizer.java | 8 - .../sql/plan/logical/command/Command.java | 2 +- .../sql/plan/logical/command/ShowColumns.java | 4 +- .../sql/plan/logical/command/ShowTables.java | 29 ++- .../xpack/sql/plugin/SqlGetIndicesAction.java | 244 ++++++++++++++++++ .../xpack/sql/plugin/SqlPlugin.java | 21 +- .../plugin/cli/action/TransportCliAction.java | 3 +- .../xpack/sql/plugin/jdbc/JdbcServer.java | 55 ++-- .../jdbc/action/TransportJdbcAction.java | 3 +- .../xpack/sql/session/SqlSession.java | 44 +++- .../sql/analysis/catalog/EsCatalogTests.java | 37 +-- .../sql/analysis/catalog/InMemoryCatalog.java | 13 - .../sql/plugin/SqlGetIndicesActionTests.java | 102 ++++++++ 24 files changed, 744 insertions(+), 238 deletions(-) create mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlGetIndicesAction.java create mode 100644 sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlGetIndicesActionTests.java diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlLicenseIT.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlLicenseIT.java index bafd7f96832..dfa9099fe31 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlLicenseIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlLicenseIT.java @@ -135,6 +135,8 @@ public class SqlLicenseIT extends AbstractLicensesIntegrationTestCase { assertThat(response, instanceOf(MetaTableResponse.class)); } + // TODO test SqlGetIndicesAction. Skipping for now because of lack of serialization support. + private void setupTestIndex() { assertAcked(client().admin().indices().prepareCreate("test").get()); client().prepareBulk() diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/TestUtils.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/TestUtils.java index 3dae4ee1226..bcc62784a72 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/sql/TestUtils.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/TestUtils.java @@ -5,18 +5,29 @@ */ package org.elasticsearch.xpack.sql; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog; import org.elasticsearch.xpack.sql.execution.PlanExecutor; +import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction; + +import java.util.function.BiConsumer; public class TestUtils { - // NOCOMMIT I think these may not be needed if we switch to integration tests for the protos public static PlanExecutor planExecutor(Client client) { - EsCatalog catalog = new EsCatalog(() -> client.admin().cluster().prepareState().get(TimeValue.timeValueMinutes(1)).getState()); - catalog.setIndexNameExpressionResolver(new IndexNameExpressionResolver(client.settings())); - PlanExecutor executor = new PlanExecutor(client, catalog); + BiConsumer> getIndices = (request, listener) -> { + ClusterState state = client.admin().cluster().prepareState().get(TimeValue.timeValueMinutes(1)).getState(); + SqlGetIndicesAction.operation(new IndexNameExpressionResolver(Settings.EMPTY), EsCatalog::new, request, state, listener); + }; + PlanExecutor executor = new PlanExecutor( + client, + () -> client.admin().cluster().prepareState().get(TimeValue.timeValueMinutes(1)).getState(), + getIndices, + EsCatalog::new); return executor; } } \ No newline at end of file diff --git a/qa/sql/security/roles.yml b/qa/sql/security/roles.yml index 7c6aa681306..0574de04d08 100644 --- a/qa/sql/security/roles.yml +++ b/qa/sql/security/roles.yml @@ -1,7 +1,9 @@ -read_test: +read_all: indices: - names: test privileges: [read] + - names: bort + privileges: [read] read_nothing: @@ -41,3 +43,8 @@ read_test_without_c_3: ] } } + +read_bort: + indices: + - names: bort + privileges: [read] diff --git a/qa/sql/security/src/test/java/org/elasticsearch/xpack/sql/security/SqlSecurityIT.java b/qa/sql/security/src/test/java/org/elasticsearch/xpack/sql/security/SqlSecurityIT.java index 1cd0ab0b4ca..65f39e05a34 100644 --- a/qa/sql/security/src/test/java/org/elasticsearch/xpack/sql/security/SqlSecurityIT.java +++ b/qa/sql/security/src/test/java/org/elasticsearch/xpack/sql/security/SqlSecurityIT.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.NotEqualMessageBuilder; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction; import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction; import org.hamcrest.Matcher; import org.junit.After; @@ -38,12 +39,15 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.elasticsearch.xpack.sql.RestSqlTestCase.columnInfo; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItems; public class SqlSecurityIT extends ESRestTestCase { private static boolean oneTimeSetup = false; @@ -77,11 +81,13 @@ public class SqlSecurityIT extends ESRestTestCase { return; } StringBuilder bulk = new StringBuilder(); - bulk.append("{\"index\":{\"_id\":\"1\"}\n"); + bulk.append("{\"index\":{\"_index\": \"test\", \"_type\": \"doc\", \"_id\":\"1\"}\n"); bulk.append("{\"a\": 1, \"b\": 2, \"c\": 3}\n"); - bulk.append("{\"index\":{\"_id\":\"2\"}\n"); + bulk.append("{\"index\":{\"_index\": \"test\", \"_type\": \"doc\", \"_id\":\"2\"}\n"); bulk.append("{\"a\": 4, \"b\": 5, \"c\": 6}\n"); - client().performRequest("PUT", "/test/test/_bulk", singletonMap("refresh", "true"), + bulk.append("{\"index\":{\"_index\": \"bort\", \"_type\": \"doc\", \"_id\":\"1\"}\n"); + bulk.append("{\"a\": \"test\"}\n"); + client().performRequest("PUT", "/_bulk", singletonMap("refresh", "true"), new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON)); /* Wait for the audit log to go quiet and then clear it to protect * us from log events coming from other tests. */ @@ -140,9 +146,9 @@ public class SqlSecurityIT extends ESRestTestCase { // NOCOMMIT we're going to need to test jdbc and cli with these too! // NOCOMMIT we'll have to test scrolling as well - // NOCOMMIT tests for describing a table and showing tables + // NOCOMMIT assert that we don't have more audit logs then what we expect. - public void testSqlWorksAsAdmin() throws Exception { + public void testQueryWorksAsAdmin() throws Exception { Map expected = new HashMap<>(); expected.put("columns", Arrays.asList( columnInfo("a", "long"), @@ -153,18 +159,18 @@ public class SqlSecurityIT extends ESRestTestCase { Arrays.asList(4, 5, 6))); expected.put("size", 2); assertResponse(expected, runSql("SELECT * FROM test ORDER BY a", null)); - assertAuditForSqlGranted("test_admin", "test"); + assertAuditForSqlGetTableSyncGranted("test_admin", "test"); } - public void testSqlWithFullAccess() throws Exception { - createUser("full_access", "read_test"); + public void testQueryWithFullAccess() throws Exception { + createUser("full_access", "read_all"); assertResponse(runSql("SELECT * FROM test ORDER BY a", null), runSql("SELECT * FROM test ORDER BY a", "full_access")); - assertAuditForSqlGranted("test_admin", "test"); - assertAuditForSqlGranted("full_access", "test"); + assertAuditForSqlGetTableSyncGranted("test_admin", "test"); + assertAuditForSqlGetTableSyncGranted("full_access", "test"); } - public void testSqlNoAccess() throws Exception { + public void testQueryNoAccess() throws Exception { createUser("no_access", "read_nothing"); ResponseException e = expectThrows(ResponseException.class, () -> runSql("SELECT * FROM test", "no_access")); @@ -174,7 +180,7 @@ public class SqlSecurityIT extends ESRestTestCase { && "no_access".equals(m.get("principal"))); } - public void testSqlWrongAccess() throws Exception { + public void testQueryWrongAccess() throws Exception { createUser("wrong_access", "read_something_else"); ResponseException e = expectThrows(ResponseException.class, () -> runSql("SELECT * FROM test", "wrong_access")); @@ -192,12 +198,12 @@ public class SqlSecurityIT extends ESRestTestCase { && "wrong_access".equals(m.get("principal"))); } - public void testSqlSingleFieldGranted() throws Exception { + public void testQuerySingleFieldGranted() throws Exception { createUser("only_a", "read_test_a"); assertResponse(runSql("SELECT a FROM test", null), runSql("SELECT * FROM test", "only_a")); - assertAuditForSqlGranted("test_admin", "test"); - assertAuditForSqlGranted("only_a", "test"); + assertAuditForSqlGetTableSyncGranted("test_admin", "test"); + assertAuditForSqlGetTableSyncGranted("only_a", "test"); clearAuditEvents(); expectBadRequest(() -> runSql("SELECT c FROM test", "only_a"), containsString("line 1:8: Unresolved item 'c'")); /* The user has permission to query the index but one of the @@ -206,15 +212,15 @@ public class SqlSecurityIT extends ESRestTestCase { * query from the audit side because all the permissions checked * out but it failed in SQL because it couldn't compile the * query without the metadata for the missing field. */ - assertAuditForSqlGranted("only_a", "test"); + assertAuditForSqlGetTableSyncGranted("only_a", "test"); } - public void testSqlSingleFieldExcepted() throws Exception { + public void testQuerySingleFieldExcepted() throws Exception { createUser("not_c", "read_test_a_and_b"); assertResponse(runSql("SELECT a, b FROM test", null), runSql("SELECT * FROM test", "not_c")); - assertAuditForSqlGranted("test_admin", "test"); - assertAuditForSqlGranted("not_c", "test"); + assertAuditForSqlGetTableSyncGranted("test_admin", "test"); + assertAuditForSqlGetTableSyncGranted("not_c", "test"); clearAuditEvents(); expectBadRequest(() -> runSql("SELECT c FROM test", "not_c"), containsString("line 1:8: Unresolved item 'c'")); /* The user has permission to query the index but one of the @@ -223,14 +229,164 @@ public class SqlSecurityIT extends ESRestTestCase { * query from the audit side because all the permissions checked * out but it failed in SQL because it couldn't compile the * query without the metadata for the missing field. */ - assertAuditForSqlGranted("not_c", "test"); + assertAuditForSqlGetTableSyncGranted("not_c", "test"); } - public void testSqlDocumentExclued() throws Exception { + public void testQueryDocumentExclued() throws Exception { createUser("no_3s", "read_test_without_c_3"); assertResponse(runSql("SELECT * FROM test WHERE c != 3", null), runSql("SELECT * FROM test", "no_3s")); - assertAuditForSqlGranted("no_3s", "test"); + assertAuditForSqlGetTableSyncGranted("test_admin", "test"); + assertAuditForSqlGetTableSyncGranted("no_3s", "test"); + } + + public void testShowTablesWorksAsAdmin() throws Exception { + Map expected = new HashMap<>(); + expected.put("columns", singletonList(columnInfo("table", "keyword"))); + expected.put("rows", Arrays.asList( + singletonList("bort"), + singletonList("test"))); + expected.put("size", 2); + assertResponse(expected, runSql("SHOW TABLES", null)); + assertAuditEvents( + audit(true, SqlAction.NAME, "test_admin", null), + audit(true, SqlGetIndicesAction.NAME, "test_admin", hasItems("test", "bort"))); + } + + public void testShowTablesWorksAsFullAccess() throws Exception { + createUser("full_access", "read_all"); + + assertResponse(runSql("SHOW TABLES", null), runSql("SHOW TABLES", "full_access")); + assertAuditEvents( + audit(true, SqlAction.NAME, "test_admin", null), + audit(true, SqlGetIndicesAction.NAME, "test_admin", hasItems("test", "bort")), + audit(true, SqlAction.NAME, "full_access", null), + audit(true, SqlGetIndicesAction.NAME, "full_access", hasItems("test", "bort"))); + } + + public void testShowTablesWithNoAccess() throws Exception { + createUser("no_access", "read_nothing"); + + ResponseException e = expectThrows(ResponseException.class, () -> runSql("SHOW TABLES", "no_access")); + assertThat(e.getMessage(), containsString("403 Forbidden")); + assertAuditEvents(audit(false, SqlAction.NAME, "no_access", null)); + } + + public void testShowTablesWithLimitedAccess() throws Exception { + createUser("read_bort", "read_bort"); + + assertResponse(runSql("SHOW TABLES LIKE 'bort'", null), runSql("SHOW TABLES", "read_bort")); + assertAuditForSqlGetTableSyncGranted("test_admin", "bort"); + assertAuditEvents( + audit(true, SqlAction.NAME, "test_admin", null), + audit(true, SqlGetIndicesAction.NAME, "test_admin", contains("bort")), + audit(true, SqlAction.NAME, "read_bort", null), + audit(true, SqlGetIndicesAction.NAME, "read_bort", contains("bort"))); + } + + public void testShowTablesWithLimitedAccessAndPattern() throws Exception { + createUser("read_bort", "read_bort"); + + Map expected = new HashMap<>(); + expected.put("columns", singletonList(columnInfo("table", "keyword"))); + expected.put("rows", emptyList()); + expected.put("size", 0); + + assertResponse(expected, runSql("SHOW TABLES LIKE 'test'", "read_bort")); + assertAuditEvents( + audit(true, SqlAction.NAME, "read_bort", null), + audit(true, SqlGetIndicesAction.NAME, "read_bort", contains("*", "-*"))); + } + + public void testDescribeWorksAsAdmin() throws Exception { + Map expected = new HashMap<>(); + expected.put("columns", Arrays.asList( + columnInfo("column", "keyword"), + columnInfo("type", "keyword"))); + expected.put("rows", Arrays.asList( + Arrays.asList("a", "BIGINT"), + Arrays.asList("b", "BIGINT"), + Arrays.asList("c", "BIGINT"))); + expected.put("size", 3); + assertResponse(expected, runSql("DESCRIBE test", null)); + assertAuditForSqlGetTableSyncGranted("test_admin", "test"); + } + + public void testDescribeWorksAsFullAccess() throws Exception { + createUser("full_access", "read_all"); + + assertResponse(runSql("DESCRIBE test", null), runSql("DESCRIBE test", "full_access")); + assertAuditForSqlGetTableSyncGranted("test_admin", "test"); + assertAuditForSqlGetTableSyncGranted("full_access", "test"); + } + + public void testDescribeWithNoAccess() throws Exception { + createUser("no_access", "read_nothing"); + + ResponseException e = expectThrows(ResponseException.class, () -> runSql("DESCRIBE test", "no_access")); + assertThat(e.getMessage(), containsString("403 Forbidden")); + assertAuditEvents(m -> "access_denied".equals(m.get("event_type")) + && m.get("indices") == null + && "no_access".equals(m.get("principal"))); + } + + public void testDescribeWithWrongAccess() throws Exception { + createUser("wrong_access", "read_something_else"); + + ResponseException e = expectThrows(ResponseException.class, () -> runSql("DESCRIBE test", "wrong_access")); + assertThat(e.getMessage(), containsString("403 Forbidden")); + assertAuditEvents( + /* This user has permission to run sql queries so they are + * given preliminary authorization. */ + m -> "access_granted".equals(m.get("event_type")) + && null == m.get("indices") + && "wrong_access".equals(m.get("principal")), + /* But as soon as they attempt to resolve an index that + * they don't have access to they get denied. */ + m -> "access_denied".equals(m.get("event_type")) + && singletonList("test").equals(m.get("indices")) + && "wrong_access".equals(m.get("principal"))); + + } + + public void testDescribeSingleFieldGranted() throws Exception { + createUser("only_a", "read_test_a"); + + Map expected = new HashMap<>(); + expected.put("columns", Arrays.asList( + columnInfo("column", "keyword"), + columnInfo("type", "keyword"))); + expected.put("rows", singletonList(Arrays.asList("a", "BIGINT"))); + expected.put("size", 1); + + assertResponse(expected, runSql("DESCRIBE test", "only_a")); + assertAuditForSqlGetTableSyncGranted("only_a", "test"); + clearAuditEvents(); + } + + public void testDescribeSingleFieldExcepted() throws Exception { + createUser("not_c", "read_test_a_and_b"); + + Map expected = new HashMap<>(); + expected.put("columns", Arrays.asList( + columnInfo("column", "keyword"), + columnInfo("type", "keyword"))); + expected.put("rows", Arrays.asList( + Arrays.asList("a", "BIGINT"), + Arrays.asList("b", "BIGINT"))); + expected.put("size", 2); + + assertResponse(expected, runSql("DESCRIBE test", "not_c")); + assertAuditForSqlGetTableSyncGranted("not_c", "test"); + clearAuditEvents(); + } + + public void testDescribeDocumentExclued() throws Exception { + createUser("no_3s", "read_test_without_c_3"); + + assertResponse(runSql("DESCRIBE test", null), runSql("DESCRIBE test", "no_3s")); + assertAuditForSqlGetTableSyncGranted("test_admin", "test"); + assertAuditForSqlGetTableSyncGranted("no_3s", "test"); } private void expectBadRequest(ThrowingRunnable code, Matcher errorMessageMatcher) { @@ -271,12 +427,14 @@ public class SqlSecurityIT extends ESRestTestCase { new StringEntity(user.string(), ContentType.APPLICATION_JSON)); } - private void assertAuditForSqlGranted(String user, String index) throws Exception { + private void assertAuditForSqlGetTableSyncGranted(String user, String index) throws Exception { assertAuditEvents( m -> "access_granted".equals(m.get("event_type")) + && SqlAction.NAME.equals(m.get("action")) && m.get("indices") == null && user.equals(m.get("principal")), m -> "access_granted".equals(m.get("event_type")) + && SqlAction.NAME.equals(m.get("action")) && singletonList(index).equals(m.get("indices")) && user.equals(m.get("principal"))); } @@ -292,9 +450,21 @@ public class SqlSecurityIT extends ESRestTestCase { assertBusy(() -> { XContentBuilder search = JsonXContent.contentBuilder().prettyPrint(); search.startObject(); { - search.array("_source", "@timestamp", "indices", "principal", "event_type"); search.startObject("query"); { - search.startObject("match").field("action", SqlAction.NAME).endObject(); + search.startObject("bool"); { + search.startArray("should"); { + search.startObject(); { + search.startObject("match").field("action", SqlAction.NAME).endObject(); + } + search.endObject(); + search.startObject(); { + search.startObject("match").field("action", SqlGetIndicesAction.NAME).endObject(); + } + search.endObject(); + } + search.endArray(); + } + search.endObject(); } search.endObject(); } @@ -341,6 +511,15 @@ public class SqlSecurityIT extends ESRestTestCase { } } + private CheckedFunction, Boolean, Exception> audit(boolean granted, String action, + String principal, Matcher> indicesMatcher) { + String eventType = granted ? "access_granted" : "access_denied"; + return m -> eventType.equals(m.get("event_type")) + && action.equals(m.get("action")) + && principal.equals(m.get("principal")) + && (indicesMatcher == null ? false == m.containsKey("indices") : indicesMatcher.matches(m.get("indices"))); + } + private void clearAuditEvents() throws Exception { try { assertBusy(() -> { @@ -360,4 +539,4 @@ public class SqlSecurityIT extends ESRestTestCase { throw e; } } -} +} \ No newline at end of file diff --git a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliProtoHandler.java b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliProtoHandler.java index b97dd89dc38..c3cc2001db6 100644 --- a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliProtoHandler.java +++ b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/CliProtoHandler.java @@ -28,8 +28,8 @@ class CliProtoHandler extends ProtoHandler { CliProtoHandler(Client client) { super(client, response -> AbstractSqlServer.write(AbstractProto.CURRENT_VERSION, response)); - this.server = new CliServer(TestUtils.planExecutor(client), clusterName, () -> info.getNode().getName(), info.getVersion(), - info.getBuild()); + this.server = new CliServer(TestUtils.planExecutor(client), clusterName, + () -> info.getNode().getName(), info.getVersion(), info.getBuild()); } @Override diff --git a/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/framework/SqlProtoHandler.java b/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/framework/SqlProtoHandler.java index cb2dcbd482f..becff27fc1d 100644 --- a/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/framework/SqlProtoHandler.java +++ b/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/framework/SqlProtoHandler.java @@ -28,8 +28,8 @@ class SqlProtoHandler extends ProtoHandler { SqlProtoHandler(Client client) { super(client, response -> AbstractSqlServer.write(AbstractProto.CURRENT_VERSION, response)); - this.server = new JdbcServer(TestUtils.planExecutor(client), clusterName, () -> info.getNode().getName(), info.getVersion(), - info.getBuild()); + this.server = new JdbcServer(TestUtils.planExecutor(client), clusterName, + () -> info.getNode().getName(), info.getVersion(), info.getBuild()); } @Override diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/analyzer/Analyzer.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/analyzer/Analyzer.java index 9f70d114e56..d37b526cc46 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/analyzer/Analyzer.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/analyzer/Analyzer.java @@ -10,7 +10,6 @@ import org.elasticsearch.xpack.sql.analysis.AnalysisException; import org.elasticsearch.xpack.sql.analysis.UnknownFunctionException; import org.elasticsearch.xpack.sql.analysis.UnknownIndexException; import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier.Failure; -import org.elasticsearch.xpack.sql.analysis.catalog.Catalog; import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex; import org.elasticsearch.xpack.sql.capabilities.Resolvables; import org.elasticsearch.xpack.sql.expression.Alias; @@ -71,11 +70,11 @@ import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine; public class Analyzer extends RuleExecutor { - private final Catalog catalog; + private final SqlSession session; private final FunctionRegistry functionRegistry; - public Analyzer(Catalog catalog, FunctionRegistry functionRegistry) { - this.catalog = catalog; + public Analyzer(SqlSession session, FunctionRegistry functionRegistry) { + this.session = session; this.functionRegistry = functionRegistry; } @@ -243,7 +242,7 @@ public class Analyzer extends RuleExecutor { TableIdentifier table = plan.table(); EsIndex found; try { - found = catalog.getIndex(table.index()); + found = session.getIndexSync(table.index()); } catch (SqlIllegalArgumentException e) { throw new AnalysisException(plan, e.getMessage(), e); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/catalog/Catalog.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/catalog/Catalog.java index 793d2c5b72b..d0225bc9508 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/catalog/Catalog.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/catalog/Catalog.java @@ -5,20 +5,21 @@ */ package org.elasticsearch.xpack.sql.analysis.catalog; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Nullable; import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; -import java.util.List; - - +/** + * Converts from Elasticsearch's internal metadata ({@link ClusterState}) + * into a representation that is compatible with SQL (@{link {@link EsIndex}). + */ public interface Catalog { /** * Lookup the information for a table, returning {@code null} if * the index is not found. - * @throws SqlIllegalArgumentException if the index is in some way invalid for use with sql + * @throws SqlIllegalArgumentException if the index is in some way invalid + * for use with SQL */ @Nullable EsIndex getIndex(String index) throws SqlIllegalArgumentException; - - List listIndices(String pattern); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/catalog/EsCatalog.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/catalog/EsCatalog.java index 460cfcb8030..d53b0ae30f5 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/catalog/EsCatalog.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/catalog/EsCatalog.java @@ -7,99 +7,58 @@ package org.elasticsearch.xpack.sql.analysis.catalog; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.function.Supplier; public class EsCatalog implements Catalog { + private final ClusterState clusterState; - private final Supplier clusterState; - private IndexNameExpressionResolver indexNameExpressionResolver; - - public EsCatalog(Supplier clusterState) { + public EsCatalog(ClusterState clusterState) { this.clusterState = clusterState; } - @Inject // NOCOMMIT more to ctor and move resolver to createComponents - public void setIndexNameExpressionResolver(IndexNameExpressionResolver resolver) { - this.indexNameExpressionResolver = resolver; - } - - private MetaData metadata() { - return clusterState.get().getMetaData(); - } - @Override public EsIndex getIndex(String index) throws SqlIllegalArgumentException { - MetaData metadata = metadata(); - IndexMetaData idx = metadata.index(index); + IndexMetaData idx = clusterState.getMetaData().index(index); if (idx == null) { return null; } - return EsIndex.build(idx, singleType(idx, false)); - } - - @Override - public List listIndices(String pattern) { - Iterator indexMetadata = null; - MetaData md = metadata(); - if (pattern == null) { - indexMetadata = md.indices().valuesIt(); + if (idx.getIndex().getName().startsWith(".")) { + /* Indices that start with "." are considered internal and + * should not be available to SQL. */ + return null; } - else { - String[] indexNames = resolveIndex(pattern); - List indices = new ArrayList<>(indexNames.length); - for (String indexName : indexNames) { - indices.add(md.index(indexName)); - } - indexMetadata = indices.iterator(); + MappingMetaData type = singleType(idx.getMappings(), idx.getIndex().getName()); + if (type == null) { + return null; } - - List list = new ArrayList<>(); - // filter unsupported (indices with more than one type) indices - while (indexMetadata.hasNext()) { - IndexMetaData imd = indexMetadata.next(); - MappingMetaData type = singleType(imd, true); - if (type != null) { - list.add(EsIndex.build(imd, type)); - } - } - - return list; + return EsIndex.build(idx, type); } /** - * Return the single type in the index of {@code null} if there - * are no types in the index. - * @param badIndicesAreNull if true then return null for indices with - * more than one type, if false throw an exception for such indices + * Return the single type in the index, {@code null} if there + * are no types in the index, and throw a {@link SqlIllegalArgumentException} + * if there are multiple types in the index. */ @Nullable - private MappingMetaData singleType(IndexMetaData index, boolean badIndicesAreNull) { + public MappingMetaData singleType(ImmutableOpenMap mappings, String name) { /* We actually ignore the _default_ mapping because it is still * allowed but deprecated. */ MappingMetaData result = null; List typeNames = null; - for (ObjectObjectCursor type : index.getMappings()) { + for (ObjectObjectCursor type : mappings) { if ("_default_".equals(type.key)) { continue; } if (result != null) { - if (badIndicesAreNull) { - return null; - } if (typeNames == null) { typeNames = new ArrayList<>(); typeNames.add(result.type()); @@ -112,11 +71,6 @@ public class EsCatalog implements Catalog { return result; } Collections.sort(typeNames); - throw new IllegalArgumentException("[" + index.getIndex().getName() + "] has more than one type " + typeNames); - } - - private String[] resolveIndex(String pattern) { - // NOCOMMIT we should use the cluster state that we resolve when we fetch the metadata so it is the *same* so we don't have weird errors when indices are deleted - return indexNameExpressionResolver.concreteIndexNames(clusterState.get(), IndicesOptions.strictExpandOpenAndForbidClosed(), pattern); + throw new SqlIllegalArgumentException("[" + name + "] has more than one type " + typeNames); } } \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/catalog/FilteredCatalog.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/catalog/FilteredCatalog.java index aab8b1bc5ec..16b5f2c254d 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/catalog/FilteredCatalog.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/analysis/catalog/FilteredCatalog.java @@ -5,8 +5,6 @@ */ package org.elasticsearch.xpack.sql.analysis.catalog; -import java.util.List; - /** * {@link Catalog} implementation that filters the results. */ @@ -28,15 +26,8 @@ public class FilteredCatalog implements Catalog { this.filter = filter; } - @Override - public List listIndices(String pattern) { - // NOCOMMIT authorize me - return delegate.listIndices(pattern); - } - @Override public EsIndex getIndex(String index) { - // NOCOMMIT we need to think really carefully about how we deal with aliases that resolve into multiple indices. EsIndex result = delegate.getIndex(index); if (result == null) { return null; diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java index 631662490b6..9e9992e5144 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java @@ -7,45 +7,56 @@ package org.elasticsearch.xpack.sql.execution; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.xpack.sql.analysis.catalog.Catalog; import org.elasticsearch.xpack.sql.expression.function.DefaultFunctionRegistry; import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry; import org.elasticsearch.xpack.sql.optimizer.Optimizer; import org.elasticsearch.xpack.sql.parser.SqlParser; import org.elasticsearch.xpack.sql.planner.Planner; +import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction; import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.RowSetCursor; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSettings; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; + public class PlanExecutor { private final Client client; + private final Supplier stateSupplier; + /** + * The way that we resolve indices asynchronously. This must + * be passed in to support embedded mode. Otherwise we could + * use the {@link #client} directly. + */ + private final BiConsumer> getIndices; + private final Function catalogSupplier; private final SqlParser parser; - private Catalog catalog; private final FunctionRegistry functionRegistry; - private final Analyzer analyzer; private final Optimizer optimizer; private final Planner planner; - public PlanExecutor(Client client, Catalog catalog) { + public PlanExecutor(Client client, Supplier stateSupplier, + BiConsumer> getIndices, + Function catalogSupplier) { this.client = client; - this.catalog = catalog; + this.stateSupplier = stateSupplier; + this.getIndices = getIndices; + this.catalogSupplier = catalogSupplier; this.parser = new SqlParser(); this.functionRegistry = new DefaultFunctionRegistry(); - this.analyzer = new Analyzer(catalog, functionRegistry); - this.optimizer = new Optimizer(catalog); + this.optimizer = new Optimizer(); this.planner = new Planner(); } - public Catalog catalog() { - return catalog; - } - public SqlSession newSession(SqlSettings settings) { - return new SqlSession(settings, client, parser, catalog, functionRegistry, analyzer, optimizer, planner); + return new SqlSession(settings, client, getIndices, catalogSupplier.apply(stateSupplier.get()), parser, + functionRegistry, optimizer, planner); } public void sql(String sql, ActionListener listener) { diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/optimizer/Optimizer.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/optimizer/Optimizer.java index 29870496ccc..dc89c6288cf 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/optimizer/Optimizer.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/optimizer/Optimizer.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.sql.optimizer; import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; -import org.elasticsearch.xpack.sql.analysis.catalog.Catalog; import org.elasticsearch.xpack.sql.expression.Alias; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.AttributeSet; @@ -78,13 +77,6 @@ import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine; public class Optimizer extends RuleExecutor { - - private final Catalog catalog; - - public Optimizer(Catalog catalog) { - this.catalog = catalog; - } - public ExecutionInfo debugOptimize(LogicalPlan verified) { return verified.optimized() ? null : executeWithInfo(verified); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java index 6f1eefe309b..210a8030c55 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java @@ -26,7 +26,7 @@ public abstract class Command extends LogicalPlan implements Executable { } @Override - public final void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { listener.onResponse(execute(session)); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java index fdd5853bfe5..3cd12fd3933 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.sql.plan.logical.command; import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; -import org.elasticsearch.xpack.sql.analysis.catalog.Catalog; import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; @@ -47,11 +46,10 @@ public class ShowColumns extends Command { @Override protected RowSetCursor execute(SqlSession session) { - Catalog catalog = session.catalog(); List> rows = new ArrayList<>(); EsIndex fetched; try { - fetched = catalog.getIndex(index); + fetched = session.getIndexSync(index); } catch (SqlIllegalArgumentException e) { throw new IllegalArgumentException(e); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java index 41d9608b9f8..2e8e3fdfffc 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java @@ -5,7 +5,10 @@ */ package org.elasticsearch.xpack.sql.plan.logical.command; -import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; import org.elasticsearch.xpack.sql.session.RowSetCursor; @@ -13,21 +16,21 @@ import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.type.DataTypes; +import org.elasticsearch.xpack.sql.util.StringUtils; -import java.util.Collections; import java.util.List; import java.util.Objects; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; -import static java.util.Comparator.comparing; import static java.util.stream.Collectors.toList; public class ShowTables extends Command { + @Nullable private final String pattern; - public ShowTables(Location location, String pattern) { + public ShowTables(Location location, @Nullable String pattern) { super(location); this.pattern = pattern; } @@ -42,14 +45,18 @@ public class ShowTables extends Command { } @Override - protected RowSetCursor execute(SqlSession session) { - List indices = session.catalog().listIndices(pattern); - // Consistent sorting is nice both for testing and humans - Collections.sort(indices, comparing(EsIndex::name)); - - return Rows.of(output(), indices.stream() + public final void execute(SqlSession session, ActionListener listener) { + String pattern = Strings.hasText(this.pattern) ? StringUtils.jdbcToEsPattern(this.pattern) : "*"; + session.getIndices(new String[] {pattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> { + listener.onResponse(Rows.of(output(), result.stream() .map(t -> singletonList(t.name())) - .collect(toList())); + .collect(toList()))); + }, listener::onFailure)); + } + + @Override + protected RowSetCursor execute(SqlSession session) { + throw new UnsupportedOperationException("No synchronous exec"); } @Override diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlGetIndicesAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlGetIndicesAction.java new file mode 100644 index 00000000000..d91f8b1d720 --- /dev/null +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlGetIndicesAction.java @@ -0,0 +1,244 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.plugin; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; +import org.elasticsearch.xpack.sql.analysis.catalog.Catalog; +import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +import static java.util.Comparator.comparing; + +public class SqlGetIndicesAction + extends Action { + public static final SqlGetIndicesAction INSTANCE = new SqlGetIndicesAction(); + public static final String NAME = "indices:data/read/sql/tables"; + + private SqlGetIndicesAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends MasterNodeReadRequest implements IndicesRequest.Replaceable { + private IndicesOptions indicesOptions; + private String[] indices; + + /** + * Deserialization and builder ctor. + */ + Request() {} + + /** + * Sensible ctor. + */ + public Request(IndicesOptions indicesOptions, String... indices) { + this.indicesOptions = indicesOptions; + this.indices = indices; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + indicesOptions = IndicesOptions.readIndicesOptions(in); + indices = in.readStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + indicesOptions.writeIndicesOptions(out); + out.writeStringArray(indices); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public Request indices(String... indices) { + this.indices = indices; + return this; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + public Request indicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + } + + public static class RequestBuilder extends MasterNodeReadOperationRequestBuilder { + public RequestBuilder(ElasticsearchClient client, SqlGetIndicesAction action) { + super(client, action, new Request()); + } + + RequestBuilder setIndicesOptions(IndicesOptions indicesOptions) { + request.indicesOptions(indicesOptions); + return this; + } + + RequestBuilder setIndices(String... indices) { + request.indices(indices); + return this; + } + } + + public static class Response extends ActionResponse { + private List indices; + + /** + * Deserialization ctor. + */ + Response() {} + + public Response(List indices) { + this.indices = indices; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + throw new UnsupportedOperationException("Must be requested locally for now"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + throw new UnsupportedOperationException("Must be requested locally for now"); + } + + public List indices() { + return indices; + } + } + + public static class TransportAction extends TransportMasterNodeReadAction { + private final Function catalog; + private final SqlLicenseChecker licenseChecker; + + @Inject + public TransportAction(Settings settings, TransportService transportService, + ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, CatalogHolder catalog, SqlLicenseChecker licenseChecker) { + super(settings, NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, Request::new); + this.catalog = catalog.catalog; + this.licenseChecker = licenseChecker; + } + + @Override + protected String executor() { + // read operation, lightweight... + return ThreadPool.Names.SAME; + } + + @Override + protected Response newResponse() { + return new Response(); + } + + @Override + protected void masterOperation(Request request, ClusterState state, ActionListener listener) { + licenseChecker.checkIfSqlAllowed(); + operation(indexNameExpressionResolver, catalog, request, state, listener); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, + indexNameExpressionResolver.concreteIndexNames(state, request)); + } + + /** + * Class that holds that {@link Catalog} to aid in guice binding. + */ + public static class CatalogHolder { + final Function catalog; + + public CatalogHolder(Function catalog) { + this.catalog = catalog; + } + } + } + + /** + * Actually looks up the indices in the cluster state and converts + * them into {@link EsIndex} instances. The rest of the contents of + * this class integrates this behavior cleanly into Elasticsearch, + * makes sure that we only try and read the cluster state when it is + * ready, integrate with security to filter the requested indices to + * what the user has permission to access, and leaves an appropriate + * audit trail. + */ + public static void operation(IndexNameExpressionResolver indexNameExpressionResolver, Function catalog, + Request request, ClusterState state, ActionListener listener) { + String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request); + List results = new ArrayList<>(concreteIndices.length); + for (String index : concreteIndices) { + EsIndex esIndex; + try { + esIndex = catalog.apply(state).getIndex(index); + } catch (SqlIllegalArgumentException e) { + assert e.getMessage().contains("has more than one type"); + esIndex = null; + } + if (esIndex != null) { + results.add(esIndex); + } + } + + // Consistent sorting is better for testing and for humans + Collections.sort(results, comparing(EsIndex::name)); + + listener.onResponse(new Response(results)); + } +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java index 82554e32b40..1789eeb29ce 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java @@ -5,9 +5,11 @@ */ package org.elasticsearch.xpack.sql.plugin; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -24,6 +26,7 @@ import org.elasticsearch.xpack.sql.analysis.catalog.Catalog; import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog; import org.elasticsearch.xpack.sql.analysis.catalog.FilteredCatalog; import org.elasticsearch.xpack.sql.execution.PlanExecutor; +import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction.TransportAction.CatalogHolder; import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction; import org.elasticsearch.xpack.sql.plugin.cli.action.CliHttpHandler; import org.elasticsearch.xpack.sql.plugin.cli.action.TransportCliAction; @@ -38,6 +41,8 @@ import org.elasticsearch.xpack.sql.session.Cursor; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.function.Supplier; public class SqlPlugin implements ActionPlugin { @@ -57,12 +62,17 @@ public class SqlPlugin implements ActionPlugin { */ public Collection createComponents(Client client, ClusterService clusterService, @Nullable FilteredCatalog.Filter catalogFilter) { - EsCatalog esCatalog = new EsCatalog(() -> clusterService.state()); - Catalog catalog = catalogFilter == null ? esCatalog : new FilteredCatalog(esCatalog, catalogFilter); + Function catalog = EsCatalog::new; + if (catalogFilter != null) { + catalog = catalog.andThen(c -> new FilteredCatalog(c, catalogFilter)); + } + BiConsumer> getIndices = (request, listener) -> { + client.execute(SqlGetIndicesAction.INSTANCE, request, listener); + }; return Arrays.asList( - esCatalog, // Added as a component so that it can get IndexNameExpressionResolver injected. + new CatalogHolder(catalog), sqlLicenseChecker, - new PlanExecutor(client, catalog)); + new PlanExecutor(client, clusterService::state, getIndices, catalog)); } @Override @@ -79,6 +89,7 @@ public class SqlPlugin implements ActionPlugin { public List> getActions() { return Arrays.asList(new ActionHandler<>(SqlAction.INSTANCE, TransportSqlAction.class), new ActionHandler<>(CliAction.INSTANCE, TransportCliAction.class), - new ActionHandler<>(JdbcAction.INSTANCE, TransportJdbcAction.class)); + new ActionHandler<>(JdbcAction.INSTANCE, TransportJdbcAction.class), + new ActionHandler<>(SqlGetIndicesAction.INSTANCE, SqlGetIndicesAction.TransportAction.class)); } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/TransportCliAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/TransportCliAction.java index 5194e58cd2a..ec2a6b08d14 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/TransportCliAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/cli/action/TransportCliAction.java @@ -38,7 +38,8 @@ public class TransportCliAction extends HandledTransportAction clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT); + this.cliServer = new CliServer(planExecutor, clusterService.getClusterName().value(), + () -> clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT); } @Override diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/JdbcServer.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/JdbcServer.java index e3a2e377dbc..a0f475f6daf 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/JdbcServer.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/JdbcServer.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.sql.plugin.jdbc; import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex; @@ -37,7 +38,6 @@ import org.elasticsearch.xpack.sql.util.StringUtils; import java.sql.JDBCType; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map.Entry; import java.util.function.Supplier; @@ -66,10 +66,10 @@ public class JdbcServer extends AbstractSqlServer { listener.onResponse(info((InfoRequest) req)); break; case META_TABLE: - listener.onResponse(metaTable((MetaTableRequest) req)); + metaTable((MetaTableRequest) req, listener); break; case META_COLUMN: - listener.onResponse(metaColumn((MetaColumnRequest) req)); + metaColumn((MetaColumnRequest) req, listener); break; case QUERY_INIT: queryInit((QueryInitRequest) req, listener); @@ -97,38 +97,39 @@ public class JdbcServer extends AbstractSqlServer { return infoResponse.get(); } - public MetaTableResponse metaTable(MetaTableRequest req) { + public void metaTable(MetaTableRequest req, ActionListener listener) { String indexPattern = hasText(req.pattern()) ? StringUtils.jdbcToEsPattern(req.pattern()) : "*"; - Collection indices = executor.catalog().listIndices(indexPattern); - return new MetaTableResponse(indices.stream() - .map(EsIndex::name) - .collect(toList())); + executor.newSession(SqlSettings.EMPTY) + .getIndices(new String[] {indexPattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> { + listener.onResponse(new MetaTableResponse(result.stream() + .map(EsIndex::name) + .collect(toList()))); + }, listener::onFailure)); } - public MetaColumnResponse metaColumn(MetaColumnRequest req) { + public void metaColumn(MetaColumnRequest req, ActionListener listener) { String pattern = Strings.hasText(req.tablePattern()) ? StringUtils.jdbcToEsPattern(req.tablePattern()) : "*"; - - Collection indices = executor.catalog().listIndices(pattern); - Pattern columnMatcher = hasText(req.columnPattern()) ? StringUtils.likeRegex(req.columnPattern()) : null; - List resp = new ArrayList<>(); - for (EsIndex esIndex : indices) { - int pos = 0; - for (Entry entry : esIndex.mapping().entrySet()) { - pos++; - if (columnMatcher == null || columnMatcher.matcher(entry.getKey()).matches()) { - String name = entry.getKey(); - String table = esIndex.name(); - JDBCType tp = entry.getValue().sqlType(); - int size = entry.getValue().precision(); - resp.add(new MetaColumnInfo(table, name, tp, size, pos)); + executor.newSession(SqlSettings.EMPTY) + .getIndices(new String[] {pattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> { + List resp = new ArrayList<>(); + for (EsIndex esIndex : result) { + int pos = 0; + for (Entry entry : esIndex.mapping().entrySet()) { + pos++; + if (columnMatcher == null || columnMatcher.matcher(entry.getKey()).matches()) { + String name = entry.getKey(); + String table = esIndex.name(); + JDBCType tp = entry.getValue().sqlType(); + int size = entry.getValue().precision(); + resp.add(new MetaColumnInfo(table, name, tp, size, pos)); + } + } } - } - } - - return new MetaColumnResponse(resp); + listener.onResponse(new MetaColumnResponse(resp)); + }, listener::onFailure)); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/TransportJdbcAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/TransportJdbcAction.java index 005585d1d3b..0fab807e40a 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/TransportJdbcAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/jdbc/action/TransportJdbcAction.java @@ -38,7 +38,8 @@ public class TransportJdbcAction extends HandledTransportAction clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT); + this.jdbcServer = new JdbcServer(planExecutor, clusterService.getClusterName().value(), + () -> clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT); } @Override diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java index 0591648c0a7..5f03e016a6d 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java @@ -6,10 +6,12 @@ package org.elasticsearch.xpack.sql.session; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer; import org.elasticsearch.xpack.sql.analysis.catalog.Catalog; +import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex; import org.elasticsearch.xpack.sql.expression.Expression; import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry; import org.elasticsearch.xpack.sql.optimizer.Optimizer; @@ -17,17 +19,20 @@ import org.elasticsearch.xpack.sql.parser.SqlParser; import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.sql.planner.Planner; +import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction; +import java.util.List; +import java.util.function.BiConsumer; import java.util.function.Function; public class SqlSession { private final Client client; + private final BiConsumer> sqlGetIndicesAction; + private final Catalog catalog; private final SqlParser parser; - private final Catalog catalog; private final FunctionRegistry functionRegistry; - private final Analyzer analyzer; private final Optimizer optimizer; private final Planner planner; @@ -36,7 +41,6 @@ public class SqlSession { // thread-local used for sharing settings across the plan compilation public static final ThreadLocal CURRENT = new ThreadLocal() { - @Override public String toString() { return "SQL Session"; @@ -44,18 +48,20 @@ public class SqlSession { }; public SqlSession(SqlSession other) { - this(other.defaults(), other.client(), other.parser, other.catalog(), other.functionRegistry(), other.analyzer(), other.optimizer(), other.planner()); + this(other.defaults(), other.client(), other.sqlGetIndicesAction, other.catalog(), other.parser, + other.functionRegistry(), other.optimizer(), other.planner()); } - public SqlSession(SqlSettings defaults, - Client client, SqlParser parser, Catalog catalog, - FunctionRegistry functionRegistry, Analyzer analyzer, Optimizer optimizer, Planner planner) { + public SqlSession(SqlSettings defaults, Client client, + BiConsumer> sqlGetIndicesAction, + Catalog catalog, SqlParser parser, FunctionRegistry functionRegistry, Optimizer optimizer, + Planner planner) { this.client = client; + this.sqlGetIndicesAction = sqlGetIndicesAction; + this.catalog = catalog; this.parser = parser; - this.catalog = catalog; this.functionRegistry = functionRegistry; - this.analyzer = analyzer; this.optimizer = optimizer; this.planner = planner; @@ -75,12 +81,29 @@ public class SqlSession { return client; } + /** + * Get the indices matching a pattern. Prefer this method if possible. + */ + public void getIndices(String[] patterns, IndicesOptions options, ActionListener> listener) { + SqlGetIndicesAction.Request request = new SqlGetIndicesAction.Request(options, patterns).local(true); + sqlGetIndicesAction.accept(request, ActionListener.wrap(response -> { + listener.onResponse(response.indices()); + }, listener::onFailure)); + } + + /** + * Get an index. Prefer not to use this method as it cannot be made to work with cross cluster search. + */ + public EsIndex getIndexSync(String index) { + return catalog.getIndex(index); + } + public Planner planner() { return planner; } public Analyzer analyzer() { - return analyzer; + return new Analyzer(this, functionRegistry); } public Optimizer optimizer() { @@ -96,6 +119,7 @@ public class SqlSession { } public LogicalPlan analyzedPlan(LogicalPlan plan, boolean verify) { + Analyzer analyzer = analyzer(); return verify ? analyzer.verify(analyzer.analyze(plan)) : analyzer.analyze(plan); } diff --git a/sql/server/src/test/java/org/elasticsearch/xpack/sql/analysis/catalog/EsCatalogTests.java b/sql/server/src/test/java/org/elasticsearch/xpack/sql/analysis/catalog/EsCatalogTests.java index ce64cd24597..ac25c5e3ac7 100644 --- a/sql/server/src/test/java/org/elasticsearch/xpack/sql/analysis/catalog/EsCatalogTests.java +++ b/sql/server/src/test/java/org/elasticsearch/xpack/sql/analysis/catalog/EsCatalogTests.java @@ -9,75 +9,58 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import java.io.IOException; -import java.util.List; -import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; -import static org.hamcrest.Matchers.hasSize; public class EsCatalogTests extends ESTestCase { public void testEmpty() { - EsCatalog catalog = catalog(ClusterState.builder(ClusterName.DEFAULT).build()); - assertEquals(emptyList(), catalog.listIndices("*")); + Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT).build()); assertNull(catalog.getIndex("test")); } public void testIndexExists() throws IOException { - EsCatalog catalog = catalog(ClusterState.builder(ClusterName.DEFAULT) + Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT) .metaData(MetaData.builder() - .put(index("test") + .put(index() .putMapping("test", "{}")) .build()) .build()); - List indices = catalog.listIndices("*"); - assertThat(indices, hasSize(1)); - assertEquals("test", indices.get(0).name()); assertEquals(emptyMap(), catalog.getIndex("test").mapping()); } public void testIndexWithDefaultType() throws IOException { - EsCatalog catalog = catalog(ClusterState.builder(ClusterName.DEFAULT) + Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT) .metaData(MetaData.builder() - .put(index("test") + .put(index() .putMapping("test", "{}") .putMapping("_default_", "{}")) .build()) .build()); - List indices = catalog.listIndices("*"); - assertThat(indices, hasSize(1)); - assertEquals("test", indices.get(0).name()); assertEquals(emptyMap(), catalog.getIndex("test").mapping()); } public void testIndexWithTwoTypes() throws IOException { - EsCatalog catalog = catalog(ClusterState.builder(ClusterName.DEFAULT) + Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT) .metaData(MetaData.builder() - .put(index("test") + .put(index() .putMapping("first_type", "{}") .putMapping("second_type", "{}")) .build()) .build()); - assertEquals(emptyList(), catalog.listIndices("*")); - Exception e = expectThrows(IllegalArgumentException.class, () -> catalog.getIndex("test")); + Exception e = expectThrows(SqlIllegalArgumentException.class, () -> catalog.getIndex("test")); assertEquals(e.getMessage(), "[test] has more than one type [first_type, second_type]"); } - private EsCatalog catalog(ClusterState state) { - EsCatalog catalog = new EsCatalog(() -> state); - catalog.setIndexNameExpressionResolver(new IndexNameExpressionResolver(Settings.EMPTY)); - return catalog; - } - - private IndexMetaData.Builder index(String name) throws IOException { + private IndexMetaData.Builder index() throws IOException { return IndexMetaData.builder("test") .settings(Settings.builder() .put("index.version.created", Version.CURRENT) diff --git a/sql/server/src/test/java/org/elasticsearch/xpack/sql/analysis/catalog/InMemoryCatalog.java b/sql/server/src/test/java/org/elasticsearch/xpack/sql/analysis/catalog/InMemoryCatalog.java index e75721d3706..19dbe984f50 100644 --- a/sql/server/src/test/java/org/elasticsearch/xpack/sql/analysis/catalog/InMemoryCatalog.java +++ b/sql/server/src/test/java/org/elasticsearch/xpack/sql/analysis/catalog/InMemoryCatalog.java @@ -5,13 +5,9 @@ */ package org.elasticsearch.xpack.sql.analysis.catalog; -import org.elasticsearch.xpack.sql.util.StringUtils; - import java.util.List; import java.util.Map; import java.util.function.Function; -import java.util.regex.Pattern; -import java.util.stream.Collectors; import static java.util.stream.Collectors.toMap; @@ -25,15 +21,6 @@ class InMemoryCatalog implements Catalog { this.indices = indices.stream().collect(toMap(EsIndex::name, Function.identity())); } - @Override - public List listIndices(String pattern) { - Pattern p = StringUtils.likeRegex(pattern); - return indices.entrySet().stream() - .filter(e -> p.matcher(e.getKey()).matches()) - .map(Map.Entry::getValue) - .collect(Collectors.toList()); - } - @Override public EsIndex getIndex(String index) { return indices.get(index); diff --git a/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlGetIndicesActionTests.java b/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlGetIndicesActionTests.java new file mode 100644 index 00000000000..25948bb7bef --- /dev/null +++ b/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlGetIndicesActionTests.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.plugin; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; +import static org.hamcrest.Matchers.hasSize; + +public class SqlGetIndicesActionTests extends ESTestCase { + private final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(Settings.EMPTY); + private final AtomicBoolean called = new AtomicBoolean(false); + private final AtomicReference error = new AtomicReference<>(); + + public void testOperation() throws IOException { + SqlGetIndicesAction.Request request = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), "test", "bar", "foo*"); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(MetaData.builder() + .put(index("test")) + .put(index("foo1")) + .put(index("foo2"))) + .build(); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(SqlGetIndicesAction.Response response) { + assertThat(response.indices(), hasSize(3)); + assertEquals("foo1", response.indices().get(0).name()); + assertEquals("foo2", response.indices().get(1).name()); + assertEquals("test", response.indices().get(2).name()); + called.set(true); + } + + @Override + public void onFailure(Exception e) { + error.set(e); + } + }; + SqlGetIndicesAction.operation(indexNameExpressionResolver, EsCatalog::new, request, clusterState, listener); + if (error.get() != null) { + throw new AssertionError(error.get()); + } + assertTrue(called.get()); + } + + public void testMultipleTypes() throws IOException { + SqlGetIndicesAction.Request request = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), "foo*"); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(MetaData.builder() + .put(index("foo1")) + .put(index("foo2").putMapping("test2", "{}"))) + .build(); + final AtomicBoolean called = new AtomicBoolean(false); + final AtomicReference error = new AtomicReference<>(); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(SqlGetIndicesAction.Response response) { + assertThat(response.indices(), hasSize(1)); + assertEquals("foo1", response.indices().get(0).name()); + called.set(true); + } + + @Override + public void onFailure(Exception e) { + error.set(e); + } + }; + SqlGetIndicesAction.operation(indexNameExpressionResolver, EsCatalog::new, request, clusterState, listener); + if (error.get() != null) { + throw new AssertionError(error.get()); + } + assertTrue(called.get()); + } + + + IndexMetaData.Builder index(String name) throws IOException { + return IndexMetaData.builder(name) + .settings(Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 1) + .put(SETTING_VERSION_CREATED, Version.CURRENT)) + .putMapping("test", "{}"); + } +}