Integrate sql's metadata with security (elastic/x-pack-elasticsearch#2446)

This integrates SQL's metadata calls with security by creating
`SqlIndicesAction` and routing all of SQL's metadata calls through
it. Since it *does* know up from which indices it is working against
it can be an `IndicesRequest.Replaceable` and integrate with the
existing security infrastructure for filtering indices.

This request is implemented fairly similarly to the `GetIndexAction`
with the option to read from the master or from a local copy of
cluster state. Currently SQL forces it to run on the local copy
because the request doesn't properly support serialization. I'd
like to implement that in a followup.

Original commit: elastic/x-pack-elasticsearch@15f9512820
This commit is contained in:
Nik Everett 2017-09-08 10:59:47 -04:00 committed by GitHub
parent be32241263
commit 3f8bf7ccc8
24 changed files with 744 additions and 238 deletions

View File

@ -135,6 +135,8 @@ public class SqlLicenseIT extends AbstractLicensesIntegrationTestCase {
assertThat(response, instanceOf(MetaTableResponse.class));
}
// TODO test SqlGetIndicesAction. Skipping for now because of lack of serialization support.
private void setupTestIndex() {
assertAcked(client().admin().indices().prepareCreate("test").get());
client().prepareBulk()

View File

@ -5,18 +5,29 @@
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction;
import java.util.function.BiConsumer;
public class TestUtils {
// NOCOMMIT I think these may not be needed if we switch to integration tests for the protos
public static PlanExecutor planExecutor(Client client) {
EsCatalog catalog = new EsCatalog(() -> client.admin().cluster().prepareState().get(TimeValue.timeValueMinutes(1)).getState());
catalog.setIndexNameExpressionResolver(new IndexNameExpressionResolver(client.settings()));
PlanExecutor executor = new PlanExecutor(client, catalog);
BiConsumer<SqlGetIndicesAction.Request, ActionListener<SqlGetIndicesAction.Response>> getIndices = (request, listener) -> {
ClusterState state = client.admin().cluster().prepareState().get(TimeValue.timeValueMinutes(1)).getState();
SqlGetIndicesAction.operation(new IndexNameExpressionResolver(Settings.EMPTY), EsCatalog::new, request, state, listener);
};
PlanExecutor executor = new PlanExecutor(
client,
() -> client.admin().cluster().prepareState().get(TimeValue.timeValueMinutes(1)).getState(),
getIndices,
EsCatalog::new);
return executor;
}
}

View File

@ -1,7 +1,9 @@
read_test:
read_all:
indices:
- names: test
privileges: [read]
- names: bort
privileges: [read]
read_nothing:
@ -41,3 +43,8 @@ read_test_without_c_3:
]
}
}
read_bort:
indices:
- names: bort
privileges: [read]

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.hamcrest.Matcher;
import org.junit.After;
@ -38,12 +39,15 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.elasticsearch.xpack.sql.RestSqlTestCase.columnInfo;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItems;
public class SqlSecurityIT extends ESRestTestCase {
private static boolean oneTimeSetup = false;
@ -77,11 +81,13 @@ public class SqlSecurityIT extends ESRestTestCase {
return;
}
StringBuilder bulk = new StringBuilder();
bulk.append("{\"index\":{\"_id\":\"1\"}\n");
bulk.append("{\"index\":{\"_index\": \"test\", \"_type\": \"doc\", \"_id\":\"1\"}\n");
bulk.append("{\"a\": 1, \"b\": 2, \"c\": 3}\n");
bulk.append("{\"index\":{\"_id\":\"2\"}\n");
bulk.append("{\"index\":{\"_index\": \"test\", \"_type\": \"doc\", \"_id\":\"2\"}\n");
bulk.append("{\"a\": 4, \"b\": 5, \"c\": 6}\n");
client().performRequest("PUT", "/test/test/_bulk", singletonMap("refresh", "true"),
bulk.append("{\"index\":{\"_index\": \"bort\", \"_type\": \"doc\", \"_id\":\"1\"}\n");
bulk.append("{\"a\": \"test\"}\n");
client().performRequest("PUT", "/_bulk", singletonMap("refresh", "true"),
new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON));
/* Wait for the audit log to go quiet and then clear it to protect
* us from log events coming from other tests. */
@ -140,9 +146,9 @@ public class SqlSecurityIT extends ESRestTestCase {
// NOCOMMIT we're going to need to test jdbc and cli with these too!
// NOCOMMIT we'll have to test scrolling as well
// NOCOMMIT tests for describing a table and showing tables
// NOCOMMIT assert that we don't have more audit logs then what we expect.
public void testSqlWorksAsAdmin() throws Exception {
public void testQueryWorksAsAdmin() throws Exception {
Map<String, Object> expected = new HashMap<>();
expected.put("columns", Arrays.asList(
columnInfo("a", "long"),
@ -153,18 +159,18 @@ public class SqlSecurityIT extends ESRestTestCase {
Arrays.asList(4, 5, 6)));
expected.put("size", 2);
assertResponse(expected, runSql("SELECT * FROM test ORDER BY a", null));
assertAuditForSqlGranted("test_admin", "test");
assertAuditForSqlGetTableSyncGranted("test_admin", "test");
}
public void testSqlWithFullAccess() throws Exception {
createUser("full_access", "read_test");
public void testQueryWithFullAccess() throws Exception {
createUser("full_access", "read_all");
assertResponse(runSql("SELECT * FROM test ORDER BY a", null), runSql("SELECT * FROM test ORDER BY a", "full_access"));
assertAuditForSqlGranted("test_admin", "test");
assertAuditForSqlGranted("full_access", "test");
assertAuditForSqlGetTableSyncGranted("test_admin", "test");
assertAuditForSqlGetTableSyncGranted("full_access", "test");
}
public void testSqlNoAccess() throws Exception {
public void testQueryNoAccess() throws Exception {
createUser("no_access", "read_nothing");
ResponseException e = expectThrows(ResponseException.class, () -> runSql("SELECT * FROM test", "no_access"));
@ -174,7 +180,7 @@ public class SqlSecurityIT extends ESRestTestCase {
&& "no_access".equals(m.get("principal")));
}
public void testSqlWrongAccess() throws Exception {
public void testQueryWrongAccess() throws Exception {
createUser("wrong_access", "read_something_else");
ResponseException e = expectThrows(ResponseException.class, () -> runSql("SELECT * FROM test", "wrong_access"));
@ -192,12 +198,12 @@ public class SqlSecurityIT extends ESRestTestCase {
&& "wrong_access".equals(m.get("principal")));
}
public void testSqlSingleFieldGranted() throws Exception {
public void testQuerySingleFieldGranted() throws Exception {
createUser("only_a", "read_test_a");
assertResponse(runSql("SELECT a FROM test", null), runSql("SELECT * FROM test", "only_a"));
assertAuditForSqlGranted("test_admin", "test");
assertAuditForSqlGranted("only_a", "test");
assertAuditForSqlGetTableSyncGranted("test_admin", "test");
assertAuditForSqlGetTableSyncGranted("only_a", "test");
clearAuditEvents();
expectBadRequest(() -> runSql("SELECT c FROM test", "only_a"), containsString("line 1:8: Unresolved item 'c'"));
/* The user has permission to query the index but one of the
@ -206,15 +212,15 @@ public class SqlSecurityIT extends ESRestTestCase {
* query from the audit side because all the permissions checked
* out but it failed in SQL because it couldn't compile the
* query without the metadata for the missing field. */
assertAuditForSqlGranted("only_a", "test");
assertAuditForSqlGetTableSyncGranted("only_a", "test");
}
public void testSqlSingleFieldExcepted() throws Exception {
public void testQuerySingleFieldExcepted() throws Exception {
createUser("not_c", "read_test_a_and_b");
assertResponse(runSql("SELECT a, b FROM test", null), runSql("SELECT * FROM test", "not_c"));
assertAuditForSqlGranted("test_admin", "test");
assertAuditForSqlGranted("not_c", "test");
assertAuditForSqlGetTableSyncGranted("test_admin", "test");
assertAuditForSqlGetTableSyncGranted("not_c", "test");
clearAuditEvents();
expectBadRequest(() -> runSql("SELECT c FROM test", "not_c"), containsString("line 1:8: Unresolved item 'c'"));
/* The user has permission to query the index but one of the
@ -223,14 +229,164 @@ public class SqlSecurityIT extends ESRestTestCase {
* query from the audit side because all the permissions checked
* out but it failed in SQL because it couldn't compile the
* query without the metadata for the missing field. */
assertAuditForSqlGranted("not_c", "test");
assertAuditForSqlGetTableSyncGranted("not_c", "test");
}
public void testSqlDocumentExclued() throws Exception {
public void testQueryDocumentExclued() throws Exception {
createUser("no_3s", "read_test_without_c_3");
assertResponse(runSql("SELECT * FROM test WHERE c != 3", null), runSql("SELECT * FROM test", "no_3s"));
assertAuditForSqlGranted("no_3s", "test");
assertAuditForSqlGetTableSyncGranted("test_admin", "test");
assertAuditForSqlGetTableSyncGranted("no_3s", "test");
}
public void testShowTablesWorksAsAdmin() throws Exception {
Map<String, Object> expected = new HashMap<>();
expected.put("columns", singletonList(columnInfo("table", "keyword")));
expected.put("rows", Arrays.asList(
singletonList("bort"),
singletonList("test")));
expected.put("size", 2);
assertResponse(expected, runSql("SHOW TABLES", null));
assertAuditEvents(
audit(true, SqlAction.NAME, "test_admin", null),
audit(true, SqlGetIndicesAction.NAME, "test_admin", hasItems("test", "bort")));
}
public void testShowTablesWorksAsFullAccess() throws Exception {
createUser("full_access", "read_all");
assertResponse(runSql("SHOW TABLES", null), runSql("SHOW TABLES", "full_access"));
assertAuditEvents(
audit(true, SqlAction.NAME, "test_admin", null),
audit(true, SqlGetIndicesAction.NAME, "test_admin", hasItems("test", "bort")),
audit(true, SqlAction.NAME, "full_access", null),
audit(true, SqlGetIndicesAction.NAME, "full_access", hasItems("test", "bort")));
}
public void testShowTablesWithNoAccess() throws Exception {
createUser("no_access", "read_nothing");
ResponseException e = expectThrows(ResponseException.class, () -> runSql("SHOW TABLES", "no_access"));
assertThat(e.getMessage(), containsString("403 Forbidden"));
assertAuditEvents(audit(false, SqlAction.NAME, "no_access", null));
}
public void testShowTablesWithLimitedAccess() throws Exception {
createUser("read_bort", "read_bort");
assertResponse(runSql("SHOW TABLES LIKE 'bort'", null), runSql("SHOW TABLES", "read_bort"));
assertAuditForSqlGetTableSyncGranted("test_admin", "bort");
assertAuditEvents(
audit(true, SqlAction.NAME, "test_admin", null),
audit(true, SqlGetIndicesAction.NAME, "test_admin", contains("bort")),
audit(true, SqlAction.NAME, "read_bort", null),
audit(true, SqlGetIndicesAction.NAME, "read_bort", contains("bort")));
}
public void testShowTablesWithLimitedAccessAndPattern() throws Exception {
createUser("read_bort", "read_bort");
Map<String, Object> expected = new HashMap<>();
expected.put("columns", singletonList(columnInfo("table", "keyword")));
expected.put("rows", emptyList());
expected.put("size", 0);
assertResponse(expected, runSql("SHOW TABLES LIKE 'test'", "read_bort"));
assertAuditEvents(
audit(true, SqlAction.NAME, "read_bort", null),
audit(true, SqlGetIndicesAction.NAME, "read_bort", contains("*", "-*")));
}
public void testDescribeWorksAsAdmin() throws Exception {
Map<String, Object> expected = new HashMap<>();
expected.put("columns", Arrays.asList(
columnInfo("column", "keyword"),
columnInfo("type", "keyword")));
expected.put("rows", Arrays.asList(
Arrays.asList("a", "BIGINT"),
Arrays.asList("b", "BIGINT"),
Arrays.asList("c", "BIGINT")));
expected.put("size", 3);
assertResponse(expected, runSql("DESCRIBE test", null));
assertAuditForSqlGetTableSyncGranted("test_admin", "test");
}
public void testDescribeWorksAsFullAccess() throws Exception {
createUser("full_access", "read_all");
assertResponse(runSql("DESCRIBE test", null), runSql("DESCRIBE test", "full_access"));
assertAuditForSqlGetTableSyncGranted("test_admin", "test");
assertAuditForSqlGetTableSyncGranted("full_access", "test");
}
public void testDescribeWithNoAccess() throws Exception {
createUser("no_access", "read_nothing");
ResponseException e = expectThrows(ResponseException.class, () -> runSql("DESCRIBE test", "no_access"));
assertThat(e.getMessage(), containsString("403 Forbidden"));
assertAuditEvents(m -> "access_denied".equals(m.get("event_type"))
&& m.get("indices") == null
&& "no_access".equals(m.get("principal")));
}
public void testDescribeWithWrongAccess() throws Exception {
createUser("wrong_access", "read_something_else");
ResponseException e = expectThrows(ResponseException.class, () -> runSql("DESCRIBE test", "wrong_access"));
assertThat(e.getMessage(), containsString("403 Forbidden"));
assertAuditEvents(
/* This user has permission to run sql queries so they are
* given preliminary authorization. */
m -> "access_granted".equals(m.get("event_type"))
&& null == m.get("indices")
&& "wrong_access".equals(m.get("principal")),
/* But as soon as they attempt to resolve an index that
* they don't have access to they get denied. */
m -> "access_denied".equals(m.get("event_type"))
&& singletonList("test").equals(m.get("indices"))
&& "wrong_access".equals(m.get("principal")));
}
public void testDescribeSingleFieldGranted() throws Exception {
createUser("only_a", "read_test_a");
Map<String, Object> expected = new HashMap<>();
expected.put("columns", Arrays.asList(
columnInfo("column", "keyword"),
columnInfo("type", "keyword")));
expected.put("rows", singletonList(Arrays.asList("a", "BIGINT")));
expected.put("size", 1);
assertResponse(expected, runSql("DESCRIBE test", "only_a"));
assertAuditForSqlGetTableSyncGranted("only_a", "test");
clearAuditEvents();
}
public void testDescribeSingleFieldExcepted() throws Exception {
createUser("not_c", "read_test_a_and_b");
Map<String, Object> expected = new HashMap<>();
expected.put("columns", Arrays.asList(
columnInfo("column", "keyword"),
columnInfo("type", "keyword")));
expected.put("rows", Arrays.asList(
Arrays.asList("a", "BIGINT"),
Arrays.asList("b", "BIGINT")));
expected.put("size", 2);
assertResponse(expected, runSql("DESCRIBE test", "not_c"));
assertAuditForSqlGetTableSyncGranted("not_c", "test");
clearAuditEvents();
}
public void testDescribeDocumentExclued() throws Exception {
createUser("no_3s", "read_test_without_c_3");
assertResponse(runSql("DESCRIBE test", null), runSql("DESCRIBE test", "no_3s"));
assertAuditForSqlGetTableSyncGranted("test_admin", "test");
assertAuditForSqlGetTableSyncGranted("no_3s", "test");
}
private void expectBadRequest(ThrowingRunnable code, Matcher<String> errorMessageMatcher) {
@ -271,12 +427,14 @@ public class SqlSecurityIT extends ESRestTestCase {
new StringEntity(user.string(), ContentType.APPLICATION_JSON));
}
private void assertAuditForSqlGranted(String user, String index) throws Exception {
private void assertAuditForSqlGetTableSyncGranted(String user, String index) throws Exception {
assertAuditEvents(
m -> "access_granted".equals(m.get("event_type"))
&& SqlAction.NAME.equals(m.get("action"))
&& m.get("indices") == null
&& user.equals(m.get("principal")),
m -> "access_granted".equals(m.get("event_type"))
&& SqlAction.NAME.equals(m.get("action"))
&& singletonList(index).equals(m.get("indices"))
&& user.equals(m.get("principal")));
}
@ -292,11 +450,23 @@ public class SqlSecurityIT extends ESRestTestCase {
assertBusy(() -> {
XContentBuilder search = JsonXContent.contentBuilder().prettyPrint();
search.startObject(); {
search.array("_source", "@timestamp", "indices", "principal", "event_type");
search.startObject("query"); {
search.startObject("bool"); {
search.startArray("should"); {
search.startObject(); {
search.startObject("match").field("action", SqlAction.NAME).endObject();
}
search.endObject();
search.startObject(); {
search.startObject("match").field("action", SqlGetIndicesAction.NAME).endObject();
}
search.endObject();
}
search.endArray();
}
search.endObject();
}
search.endObject();
}
search.endObject();
Map<String, Object> audit;
@ -341,6 +511,15 @@ public class SqlSecurityIT extends ESRestTestCase {
}
}
private CheckedFunction<Map<?, ?>, Boolean, Exception> audit(boolean granted, String action,
String principal, Matcher<? extends Iterable<? extends String>> indicesMatcher) {
String eventType = granted ? "access_granted" : "access_denied";
return m -> eventType.equals(m.get("event_type"))
&& action.equals(m.get("action"))
&& principal.equals(m.get("principal"))
&& (indicesMatcher == null ? false == m.containsKey("indices") : indicesMatcher.matches(m.get("indices")));
}
private void clearAuditEvents() throws Exception {
try {
assertBusy(() -> {

View File

@ -28,8 +28,8 @@ class CliProtoHandler extends ProtoHandler<Response> {
CliProtoHandler(Client client) {
super(client, response -> AbstractSqlServer.write(AbstractProto.CURRENT_VERSION, response));
this.server = new CliServer(TestUtils.planExecutor(client), clusterName, () -> info.getNode().getName(), info.getVersion(),
info.getBuild());
this.server = new CliServer(TestUtils.planExecutor(client), clusterName,
() -> info.getNode().getName(), info.getVersion(), info.getBuild());
}
@Override

View File

@ -28,8 +28,8 @@ class SqlProtoHandler extends ProtoHandler<Response> {
SqlProtoHandler(Client client) {
super(client, response -> AbstractSqlServer.write(AbstractProto.CURRENT_VERSION, response));
this.server = new JdbcServer(TestUtils.planExecutor(client), clusterName, () -> info.getNode().getName(), info.getVersion(),
info.getBuild());
this.server = new JdbcServer(TestUtils.planExecutor(client), clusterName,
() -> info.getNode().getName(), info.getVersion(), info.getBuild());
}
@Override

View File

@ -10,7 +10,6 @@ import org.elasticsearch.xpack.sql.analysis.AnalysisException;
import org.elasticsearch.xpack.sql.analysis.UnknownFunctionException;
import org.elasticsearch.xpack.sql.analysis.UnknownIndexException;
import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier.Failure;
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
import org.elasticsearch.xpack.sql.capabilities.Resolvables;
import org.elasticsearch.xpack.sql.expression.Alias;
@ -71,11 +70,11 @@ import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine;
public class Analyzer extends RuleExecutor<LogicalPlan> {
private final Catalog catalog;
private final SqlSession session;
private final FunctionRegistry functionRegistry;
public Analyzer(Catalog catalog, FunctionRegistry functionRegistry) {
this.catalog = catalog;
public Analyzer(SqlSession session, FunctionRegistry functionRegistry) {
this.session = session;
this.functionRegistry = functionRegistry;
}
@ -243,7 +242,7 @@ public class Analyzer extends RuleExecutor<LogicalPlan> {
TableIdentifier table = plan.table();
EsIndex found;
try {
found = catalog.getIndex(table.index());
found = session.getIndexSync(table.index());
} catch (SqlIllegalArgumentException e) {
throw new AnalysisException(plan, e.getMessage(), e);
}

View File

@ -5,20 +5,21 @@
*/
package org.elasticsearch.xpack.sql.analysis.catalog;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import java.util.List;
/**
* Converts from Elasticsearch's internal metadata ({@link ClusterState})
* into a representation that is compatible with SQL (@{link {@link EsIndex}).
*/
public interface Catalog {
/**
* Lookup the information for a table, returning {@code null} if
* the index is not found.
* @throws SqlIllegalArgumentException if the index is in some way invalid for use with sql
* @throws SqlIllegalArgumentException if the index is in some way invalid
* for use with SQL
*/
@Nullable
EsIndex getIndex(String index) throws SqlIllegalArgumentException;
List<EsIndex> listIndices(String pattern);
}

View File

@ -7,99 +7,58 @@ package org.elasticsearch.xpack.sql.analysis.catalog;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
public class EsCatalog implements Catalog {
private final ClusterState clusterState;
private final Supplier<ClusterState> clusterState;
private IndexNameExpressionResolver indexNameExpressionResolver;
public EsCatalog(Supplier<ClusterState> clusterState) {
public EsCatalog(ClusterState clusterState) {
this.clusterState = clusterState;
}
@Inject // NOCOMMIT more to ctor and move resolver to createComponents
public void setIndexNameExpressionResolver(IndexNameExpressionResolver resolver) {
this.indexNameExpressionResolver = resolver;
}
private MetaData metadata() {
return clusterState.get().getMetaData();
}
@Override
public EsIndex getIndex(String index) throws SqlIllegalArgumentException {
MetaData metadata = metadata();
IndexMetaData idx = metadata.index(index);
IndexMetaData idx = clusterState.getMetaData().index(index);
if (idx == null) {
return null;
}
return EsIndex.build(idx, singleType(idx, false));
if (idx.getIndex().getName().startsWith(".")) {
/* Indices that start with "." are considered internal and
* should not be available to SQL. */
return null;
}
@Override
public List<EsIndex> listIndices(String pattern) {
Iterator<IndexMetaData> indexMetadata = null;
MetaData md = metadata();
if (pattern == null) {
indexMetadata = md.indices().valuesIt();
MappingMetaData type = singleType(idx.getMappings(), idx.getIndex().getName());
if (type == null) {
return null;
}
else {
String[] indexNames = resolveIndex(pattern);
List<IndexMetaData> indices = new ArrayList<>(indexNames.length);
for (String indexName : indexNames) {
indices.add(md.index(indexName));
}
indexMetadata = indices.iterator();
}
List<EsIndex> list = new ArrayList<>();
// filter unsupported (indices with more than one type) indices
while (indexMetadata.hasNext()) {
IndexMetaData imd = indexMetadata.next();
MappingMetaData type = singleType(imd, true);
if (type != null) {
list.add(EsIndex.build(imd, type));
}
}
return list;
return EsIndex.build(idx, type);
}
/**
* Return the single type in the index of {@code null} if there
* are no types in the index.
* @param badIndicesAreNull if true then return null for indices with
* more than one type, if false throw an exception for such indices
* Return the single type in the index, {@code null} if there
* are no types in the index, and throw a {@link SqlIllegalArgumentException}
* if there are multiple types in the index.
*/
@Nullable
private MappingMetaData singleType(IndexMetaData index, boolean badIndicesAreNull) {
public MappingMetaData singleType(ImmutableOpenMap<String, MappingMetaData> mappings, String name) {
/* We actually ignore the _default_ mapping because it is still
* allowed but deprecated. */
MappingMetaData result = null;
List<String> typeNames = null;
for (ObjectObjectCursor<String, MappingMetaData> type : index.getMappings()) {
for (ObjectObjectCursor<String, MappingMetaData> type : mappings) {
if ("_default_".equals(type.key)) {
continue;
}
if (result != null) {
if (badIndicesAreNull) {
return null;
}
if (typeNames == null) {
typeNames = new ArrayList<>();
typeNames.add(result.type());
@ -112,11 +71,6 @@ public class EsCatalog implements Catalog {
return result;
}
Collections.sort(typeNames);
throw new IllegalArgumentException("[" + index.getIndex().getName() + "] has more than one type " + typeNames);
}
private String[] resolveIndex(String pattern) {
// NOCOMMIT we should use the cluster state that we resolve when we fetch the metadata so it is the *same* so we don't have weird errors when indices are deleted
return indexNameExpressionResolver.concreteIndexNames(clusterState.get(), IndicesOptions.strictExpandOpenAndForbidClosed(), pattern);
throw new SqlIllegalArgumentException("[" + name + "] has more than one type " + typeNames);
}
}

View File

@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.sql.analysis.catalog;
import java.util.List;
/**
* {@link Catalog} implementation that filters the results.
*/
@ -28,15 +26,8 @@ public class FilteredCatalog implements Catalog {
this.filter = filter;
}
@Override
public List<EsIndex> listIndices(String pattern) {
// NOCOMMIT authorize me
return delegate.listIndices(pattern);
}
@Override
public EsIndex getIndex(String index) {
// NOCOMMIT we need to think really carefully about how we deal with aliases that resolve into multiple indices.
EsIndex result = delegate.getIndex(index);
if (result == null) {
return null;

View File

@ -7,45 +7,56 @@ package org.elasticsearch.xpack.sql.execution;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
import org.elasticsearch.xpack.sql.expression.function.DefaultFunctionRegistry;
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.optimizer.Optimizer;
import org.elasticsearch.xpack.sql.parser.SqlParser;
import org.elasticsearch.xpack.sql.planner.Planner;
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.session.SqlSettings;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
public class PlanExecutor {
private final Client client;
private final Supplier<ClusterState> stateSupplier;
/**
* The way that we resolve indices asynchronously. This must
* be passed in to support embedded mode. Otherwise we could
* use the {@link #client} directly.
*/
private final BiConsumer<SqlGetIndicesAction.Request, ActionListener<SqlGetIndicesAction.Response>> getIndices;
private final Function<ClusterState, Catalog> catalogSupplier;
private final SqlParser parser;
private Catalog catalog;
private final FunctionRegistry functionRegistry;
private final Analyzer analyzer;
private final Optimizer optimizer;
private final Planner planner;
public PlanExecutor(Client client, Catalog catalog) {
public PlanExecutor(Client client, Supplier<ClusterState> stateSupplier,
BiConsumer<SqlGetIndicesAction.Request, ActionListener<SqlGetIndicesAction.Response>> getIndices,
Function<ClusterState, Catalog> catalogSupplier) {
this.client = client;
this.catalog = catalog;
this.stateSupplier = stateSupplier;
this.getIndices = getIndices;
this.catalogSupplier = catalogSupplier;
this.parser = new SqlParser();
this.functionRegistry = new DefaultFunctionRegistry();
this.analyzer = new Analyzer(catalog, functionRegistry);
this.optimizer = new Optimizer(catalog);
this.optimizer = new Optimizer();
this.planner = new Planner();
}
public Catalog catalog() {
return catalog;
}
public SqlSession newSession(SqlSettings settings) {
return new SqlSession(settings, client, parser, catalog, functionRegistry, analyzer, optimizer, planner);
return new SqlSession(settings, client, getIndices, catalogSupplier.apply(stateSupplier.get()), parser,
functionRegistry, optimizer, planner);
}
public void sql(String sql, ActionListener<RowSetCursor> listener) {

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.sql.optimizer;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
import org.elasticsearch.xpack.sql.expression.Alias;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.AttributeSet;
@ -78,13 +77,6 @@ import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine;
public class Optimizer extends RuleExecutor<LogicalPlan> {
private final Catalog catalog;
public Optimizer(Catalog catalog) {
this.catalog = catalog;
}
public ExecutionInfo debugOptimize(LogicalPlan verified) {
return verified.optimized() ? null : executeWithInfo(verified);
}

View File

@ -26,7 +26,7 @@ public abstract class Command extends LogicalPlan implements Executable {
}
@Override
public final void execute(SqlSession session, ActionListener<RowSetCursor> listener) {
public void execute(SqlSession session, ActionListener<RowSetCursor> listener) {
listener.onResponse(execute(session));
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.sql.plan.logical.command;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
@ -47,11 +46,10 @@ public class ShowColumns extends Command {
@Override
protected RowSetCursor execute(SqlSession session) {
Catalog catalog = session.catalog();
List<List<?>> rows = new ArrayList<>();
EsIndex fetched;
try {
fetched = catalog.getIndex(index);
fetched = session.getIndexSync(index);
} catch (SqlIllegalArgumentException e) {
throw new IllegalArgumentException(e);
}

View File

@ -5,7 +5,10 @@
*/
package org.elasticsearch.xpack.sql.plan.logical.command;
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
@ -13,21 +16,21 @@ import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataTypes;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
public class ShowTables extends Command {
@Nullable
private final String pattern;
public ShowTables(Location location, String pattern) {
public ShowTables(Location location, @Nullable String pattern) {
super(location);
this.pattern = pattern;
}
@ -42,14 +45,18 @@ public class ShowTables extends Command {
}
@Override
protected RowSetCursor execute(SqlSession session) {
List<EsIndex> indices = session.catalog().listIndices(pattern);
// Consistent sorting is nice both for testing and humans
Collections.sort(indices, comparing(EsIndex::name));
return Rows.of(output(), indices.stream()
public final void execute(SqlSession session, ActionListener<RowSetCursor> listener) {
String pattern = Strings.hasText(this.pattern) ? StringUtils.jdbcToEsPattern(this.pattern) : "*";
session.getIndices(new String[] {pattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> {
listener.onResponse(Rows.of(output(), result.stream()
.map(t -> singletonList(t.name()))
.collect(toList()));
.collect(toList())));
}, listener::onFailure));
}
@Override
protected RowSetCursor execute(SqlSession session) {
throw new UnsupportedOperationException("No synchronous exec");
}
@Override

View File

@ -0,0 +1,244 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import static java.util.Comparator.comparing;
public class SqlGetIndicesAction
extends Action<SqlGetIndicesAction.Request, SqlGetIndicesAction.Response, SqlGetIndicesAction.RequestBuilder> {
public static final SqlGetIndicesAction INSTANCE = new SqlGetIndicesAction();
public static final String NAME = "indices:data/read/sql/tables";
private SqlGetIndicesAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
private IndicesOptions indicesOptions;
private String[] indices;
/**
* Deserialization and builder ctor.
*/
Request() {}
/**
* Sensible ctor.
*/
public Request(IndicesOptions indicesOptions, String... indices) {
this.indicesOptions = indicesOptions;
this.indices = indices;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indicesOptions = IndicesOptions.readIndicesOptions(in);
indices = in.readStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
indicesOptions.writeIndicesOptions(out);
out.writeStringArray(indices);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public String[] indices() {
return indices;
}
@Override
public Request indices(String... indices) {
this.indices = indices;
return this;
}
@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}
public Request indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}
}
public static class RequestBuilder extends MasterNodeReadOperationRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, SqlGetIndicesAction action) {
super(client, action, new Request());
}
RequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
request.indicesOptions(indicesOptions);
return this;
}
RequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;
}
}
public static class Response extends ActionResponse {
private List<EsIndex> indices;
/**
* Deserialization ctor.
*/
Response() {}
public Response(List<EsIndex> indices) {
this.indices = indices;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
throw new UnsupportedOperationException("Must be requested locally for now");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
throw new UnsupportedOperationException("Must be requested locally for now");
}
public List<EsIndex> indices() {
return indices;
}
}
public static class TransportAction extends TransportMasterNodeReadAction<Request, Response> {
private final Function<ClusterState, Catalog> catalog;
private final SqlLicenseChecker licenseChecker;
@Inject
public TransportAction(Settings settings, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, CatalogHolder catalog, SqlLicenseChecker licenseChecker) {
super(settings, NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.catalog = catalog.catalog;
this.licenseChecker = licenseChecker;
}
@Override
protected String executor() {
// read operation, lightweight...
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) {
licenseChecker.checkIfSqlAllowed();
operation(indexNameExpressionResolver, catalog, request, state, listener);
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ,
indexNameExpressionResolver.concreteIndexNames(state, request));
}
/**
* Class that holds that {@link Catalog} to aid in guice binding.
*/
public static class CatalogHolder {
final Function<ClusterState, Catalog> catalog;
public CatalogHolder(Function<ClusterState, Catalog> catalog) {
this.catalog = catalog;
}
}
}
/**
* Actually looks up the indices in the cluster state and converts
* them into {@link EsIndex} instances. The rest of the contents of
* this class integrates this behavior cleanly into Elasticsearch,
* makes sure that we only try and read the cluster state when it is
* ready, integrate with security to filter the requested indices to
* what the user has permission to access, and leaves an appropriate
* audit trail.
*/
public static void operation(IndexNameExpressionResolver indexNameExpressionResolver, Function<ClusterState, Catalog> catalog,
Request request, ClusterState state, ActionListener<Response> listener) {
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request);
List<EsIndex> results = new ArrayList<>(concreteIndices.length);
for (String index : concreteIndices) {
EsIndex esIndex;
try {
esIndex = catalog.apply(state).getIndex(index);
} catch (SqlIllegalArgumentException e) {
assert e.getMessage().contains("has more than one type");
esIndex = null;
}
if (esIndex != null) {
results.add(esIndex);
}
}
// Consistent sorting is better for testing and for humans
Collections.sort(results, comparing(EsIndex::name));
listener.onResponse(new Response(results));
}
}

View File

@ -5,9 +5,11 @@
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
@ -24,6 +26,7 @@ import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
import org.elasticsearch.xpack.sql.analysis.catalog.FilteredCatalog;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction.TransportAction.CatalogHolder;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliHttpHandler;
import org.elasticsearch.xpack.sql.plugin.cli.action.TransportCliAction;
@ -38,6 +41,8 @@ import org.elasticsearch.xpack.sql.session.Cursor;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
public class SqlPlugin implements ActionPlugin {
@ -57,12 +62,17 @@ public class SqlPlugin implements ActionPlugin {
*/
public Collection<Object> createComponents(Client client, ClusterService clusterService,
@Nullable FilteredCatalog.Filter catalogFilter) {
EsCatalog esCatalog = new EsCatalog(() -> clusterService.state());
Catalog catalog = catalogFilter == null ? esCatalog : new FilteredCatalog(esCatalog, catalogFilter);
Function<ClusterState, Catalog> catalog = EsCatalog::new;
if (catalogFilter != null) {
catalog = catalog.andThen(c -> new FilteredCatalog(c, catalogFilter));
}
BiConsumer<SqlGetIndicesAction.Request, ActionListener<SqlGetIndicesAction.Response>> getIndices = (request, listener) -> {
client.execute(SqlGetIndicesAction.INSTANCE, request, listener);
};
return Arrays.asList(
esCatalog, // Added as a component so that it can get IndexNameExpressionResolver injected.
new CatalogHolder(catalog),
sqlLicenseChecker,
new PlanExecutor(client, catalog));
new PlanExecutor(client, clusterService::state, getIndices, catalog));
}
@Override
@ -79,6 +89,7 @@ public class SqlPlugin implements ActionPlugin {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(new ActionHandler<>(SqlAction.INSTANCE, TransportSqlAction.class),
new ActionHandler<>(CliAction.INSTANCE, TransportCliAction.class),
new ActionHandler<>(JdbcAction.INSTANCE, TransportJdbcAction.class));
new ActionHandler<>(JdbcAction.INSTANCE, TransportJdbcAction.class),
new ActionHandler<>(SqlGetIndicesAction.INSTANCE, SqlGetIndicesAction.TransportAction.class));
}
}

View File

@ -38,7 +38,8 @@ public class TransportCliAction extends HandledTransportAction<CliRequest, CliRe
SqlLicenseChecker sqlLicenseChecker) {
super(settings, CliAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, CliRequest::new);
this.sqlLicenseChecker = sqlLicenseChecker;
this.cliServer = new CliServer(planExecutor, clusterService.getClusterName().value(), () -> clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT);
this.cliServer = new CliServer(planExecutor, clusterService.getClusterName().value(),
() -> clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT);
}
@Override

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.sql.plugin.jdbc;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
@ -37,7 +38,6 @@ import org.elasticsearch.xpack.sql.util.StringUtils;
import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.function.Supplier;
@ -66,10 +66,10 @@ public class JdbcServer extends AbstractSqlServer {
listener.onResponse(info((InfoRequest) req));
break;
case META_TABLE:
listener.onResponse(metaTable((MetaTableRequest) req));
metaTable((MetaTableRequest) req, listener);
break;
case META_COLUMN:
listener.onResponse(metaColumn((MetaColumnRequest) req));
metaColumn((MetaColumnRequest) req, listener);
break;
case QUERY_INIT:
queryInit((QueryInitRequest) req, listener);
@ -97,24 +97,25 @@ public class JdbcServer extends AbstractSqlServer {
return infoResponse.get();
}
public MetaTableResponse metaTable(MetaTableRequest req) {
public void metaTable(MetaTableRequest req, ActionListener<Response> listener) {
String indexPattern = hasText(req.pattern()) ? StringUtils.jdbcToEsPattern(req.pattern()) : "*";
Collection<EsIndex> indices = executor.catalog().listIndices(indexPattern);
return new MetaTableResponse(indices.stream()
executor.newSession(SqlSettings.EMPTY)
.getIndices(new String[] {indexPattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> {
listener.onResponse(new MetaTableResponse(result.stream()
.map(EsIndex::name)
.collect(toList()));
.collect(toList())));
}, listener::onFailure));
}
public MetaColumnResponse metaColumn(MetaColumnRequest req) {
public void metaColumn(MetaColumnRequest req, ActionListener<Response> listener) {
String pattern = Strings.hasText(req.tablePattern()) ? StringUtils.jdbcToEsPattern(req.tablePattern()) : "*";
Collection<EsIndex> indices = executor.catalog().listIndices(pattern);
Pattern columnMatcher = hasText(req.columnPattern()) ? StringUtils.likeRegex(req.columnPattern()) : null;
executor.newSession(SqlSettings.EMPTY)
.getIndices(new String[] {pattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> {
List<MetaColumnInfo> resp = new ArrayList<>();
for (EsIndex esIndex : indices) {
for (EsIndex esIndex : result) {
int pos = 0;
for (Entry<String, DataType> entry : esIndex.mapping().entrySet()) {
pos++;
@ -127,8 +128,8 @@ public class JdbcServer extends AbstractSqlServer {
}
}
}
return new MetaColumnResponse(resp);
listener.onResponse(new MetaColumnResponse(resp));
}, listener::onFailure));
}

View File

@ -38,7 +38,8 @@ public class TransportJdbcAction extends HandledTransportAction<JdbcRequest, Jdb
SqlLicenseChecker sqlLicenseChecker) {
super(settings, JdbcAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, JdbcRequest::new);
this.sqlLicenseChecker = sqlLicenseChecker;
this.jdbcServer = new JdbcServer(planExecutor, clusterService.getClusterName().value(), () -> clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT);
this.jdbcServer = new JdbcServer(planExecutor, clusterService.getClusterName().value(),
() -> clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT);
}
@Override

View File

@ -6,10 +6,12 @@
package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.optimizer.Optimizer;
@ -17,17 +19,20 @@ import org.elasticsearch.xpack.sql.parser.SqlParser;
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.sql.planner.Planner;
import org.elasticsearch.xpack.sql.plugin.SqlGetIndicesAction;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
public class SqlSession {
private final Client client;
private final BiConsumer<SqlGetIndicesAction.Request, ActionListener<SqlGetIndicesAction.Response>> sqlGetIndicesAction;
private final Catalog catalog;
private final SqlParser parser;
private final Catalog catalog;
private final FunctionRegistry functionRegistry;
private final Analyzer analyzer;
private final Optimizer optimizer;
private final Planner planner;
@ -36,7 +41,6 @@ public class SqlSession {
// thread-local used for sharing settings across the plan compilation
public static final ThreadLocal<SqlSettings> CURRENT = new ThreadLocal<SqlSettings>() {
@Override
public String toString() {
return "SQL Session";
@ -44,18 +48,20 @@ public class SqlSession {
};
public SqlSession(SqlSession other) {
this(other.defaults(), other.client(), other.parser, other.catalog(), other.functionRegistry(), other.analyzer(), other.optimizer(), other.planner());
this(other.defaults(), other.client(), other.sqlGetIndicesAction, other.catalog(), other.parser,
other.functionRegistry(), other.optimizer(), other.planner());
}
public SqlSession(SqlSettings defaults,
Client client, SqlParser parser, Catalog catalog,
FunctionRegistry functionRegistry, Analyzer analyzer, Optimizer optimizer, Planner planner) {
public SqlSession(SqlSettings defaults, Client client,
BiConsumer<SqlGetIndicesAction.Request, ActionListener<SqlGetIndicesAction.Response>> sqlGetIndicesAction,
Catalog catalog, SqlParser parser, FunctionRegistry functionRegistry, Optimizer optimizer,
Planner planner) {
this.client = client;
this.sqlGetIndicesAction = sqlGetIndicesAction;
this.catalog = catalog;
this.parser = parser;
this.catalog = catalog;
this.functionRegistry = functionRegistry;
this.analyzer = analyzer;
this.optimizer = optimizer;
this.planner = planner;
@ -75,12 +81,29 @@ public class SqlSession {
return client;
}
/**
* Get the indices matching a pattern. Prefer this method if possible.
*/
public void getIndices(String[] patterns, IndicesOptions options, ActionListener<List<EsIndex>> listener) {
SqlGetIndicesAction.Request request = new SqlGetIndicesAction.Request(options, patterns).local(true);
sqlGetIndicesAction.accept(request, ActionListener.wrap(response -> {
listener.onResponse(response.indices());
}, listener::onFailure));
}
/**
* Get an index. Prefer not to use this method as it cannot be made to work with cross cluster search.
*/
public EsIndex getIndexSync(String index) {
return catalog.getIndex(index);
}
public Planner planner() {
return planner;
}
public Analyzer analyzer() {
return analyzer;
return new Analyzer(this, functionRegistry);
}
public Optimizer optimizer() {
@ -96,6 +119,7 @@ public class SqlSession {
}
public LogicalPlan analyzedPlan(LogicalPlan plan, boolean verify) {
Analyzer analyzer = analyzer();
return verify ? analyzer.verify(analyzer.analyze(plan)) : analyzer.analyze(plan);
}

View File

@ -9,75 +9,58 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import java.io.IOException;
import java.util.List;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.hasSize;
public class EsCatalogTests extends ESTestCase {
public void testEmpty() {
EsCatalog catalog = catalog(ClusterState.builder(ClusterName.DEFAULT).build());
assertEquals(emptyList(), catalog.listIndices("*"));
Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT).build());
assertNull(catalog.getIndex("test"));
}
public void testIndexExists() throws IOException {
EsCatalog catalog = catalog(ClusterState.builder(ClusterName.DEFAULT)
Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT)
.metaData(MetaData.builder()
.put(index("test")
.put(index()
.putMapping("test", "{}"))
.build())
.build());
List<EsIndex> indices = catalog.listIndices("*");
assertThat(indices, hasSize(1));
assertEquals("test", indices.get(0).name());
assertEquals(emptyMap(), catalog.getIndex("test").mapping());
}
public void testIndexWithDefaultType() throws IOException {
EsCatalog catalog = catalog(ClusterState.builder(ClusterName.DEFAULT)
Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT)
.metaData(MetaData.builder()
.put(index("test")
.put(index()
.putMapping("test", "{}")
.putMapping("_default_", "{}"))
.build())
.build());
List<EsIndex> indices = catalog.listIndices("*");
assertThat(indices, hasSize(1));
assertEquals("test", indices.get(0).name());
assertEquals(emptyMap(), catalog.getIndex("test").mapping());
}
public void testIndexWithTwoTypes() throws IOException {
EsCatalog catalog = catalog(ClusterState.builder(ClusterName.DEFAULT)
Catalog catalog = new EsCatalog(ClusterState.builder(ClusterName.DEFAULT)
.metaData(MetaData.builder()
.put(index("test")
.put(index()
.putMapping("first_type", "{}")
.putMapping("second_type", "{}"))
.build())
.build());
assertEquals(emptyList(), catalog.listIndices("*"));
Exception e = expectThrows(IllegalArgumentException.class, () -> catalog.getIndex("test"));
Exception e = expectThrows(SqlIllegalArgumentException.class, () -> catalog.getIndex("test"));
assertEquals(e.getMessage(), "[test] has more than one type [first_type, second_type]");
}
private EsCatalog catalog(ClusterState state) {
EsCatalog catalog = new EsCatalog(() -> state);
catalog.setIndexNameExpressionResolver(new IndexNameExpressionResolver(Settings.EMPTY));
return catalog;
}
private IndexMetaData.Builder index(String name) throws IOException {
private IndexMetaData.Builder index() throws IOException {
return IndexMetaData.builder("test")
.settings(Settings.builder()
.put("index.version.created", Version.CURRENT)

View File

@ -5,13 +5,9 @@
*/
package org.elasticsearch.xpack.sql.analysis.catalog;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toMap;
@ -25,15 +21,6 @@ class InMemoryCatalog implements Catalog {
this.indices = indices.stream().collect(toMap(EsIndex::name, Function.identity()));
}
@Override
public List<EsIndex> listIndices(String pattern) {
Pattern p = StringUtils.likeRegex(pattern);
return indices.entrySet().stream()
.filter(e -> p.matcher(e.getKey()).matches())
.map(Map.Entry::getValue)
.collect(Collectors.toList());
}
@Override
public EsIndex getIndex(String index) {
return indices.get(index);

View File

@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.hamcrest.Matchers.hasSize;
public class SqlGetIndicesActionTests extends ESTestCase {
private final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(Settings.EMPTY);
private final AtomicBoolean called = new AtomicBoolean(false);
private final AtomicReference<Exception> error = new AtomicReference<>();
public void testOperation() throws IOException {
SqlGetIndicesAction.Request request = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), "test", "bar", "foo*");
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(MetaData.builder()
.put(index("test"))
.put(index("foo1"))
.put(index("foo2")))
.build();
ActionListener<SqlGetIndicesAction.Response> listener = new ActionListener<SqlGetIndicesAction.Response>() {
@Override
public void onResponse(SqlGetIndicesAction.Response response) {
assertThat(response.indices(), hasSize(3));
assertEquals("foo1", response.indices().get(0).name());
assertEquals("foo2", response.indices().get(1).name());
assertEquals("test", response.indices().get(2).name());
called.set(true);
}
@Override
public void onFailure(Exception e) {
error.set(e);
}
};
SqlGetIndicesAction.operation(indexNameExpressionResolver, EsCatalog::new, request, clusterState, listener);
if (error.get() != null) {
throw new AssertionError(error.get());
}
assertTrue(called.get());
}
public void testMultipleTypes() throws IOException {
SqlGetIndicesAction.Request request = new SqlGetIndicesAction.Request(IndicesOptions.lenientExpandOpen(), "foo*");
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(MetaData.builder()
.put(index("foo1"))
.put(index("foo2").putMapping("test2", "{}")))
.build();
final AtomicBoolean called = new AtomicBoolean(false);
final AtomicReference<Exception> error = new AtomicReference<>();
ActionListener<SqlGetIndicesAction.Response> listener = new ActionListener<SqlGetIndicesAction.Response>() {
@Override
public void onResponse(SqlGetIndicesAction.Response response) {
assertThat(response.indices(), hasSize(1));
assertEquals("foo1", response.indices().get(0).name());
called.set(true);
}
@Override
public void onFailure(Exception e) {
error.set(e);
}
};
SqlGetIndicesAction.operation(indexNameExpressionResolver, EsCatalog::new, request, clusterState, listener);
if (error.get() != null) {
throw new AssertionError(error.get());
}
assertTrue(called.get());
}
IndexMetaData.Builder index(String name) throws IOException {
return IndexMetaData.builder(name)
.settings(Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_VERSION_CREATED, Version.CURRENT))
.putMapping("test", "{}");
}
}