From 84b631643cd50393c55f897b2961a2f61545e566 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 26 Oct 2016 21:05:49 +0200 Subject: [PATCH] Add utility method to fetch and collect results from a query (elastic/elasticsearch#3894) Today we have the same madness in two places and no dedicated test. This change moves the real madness into a single place and adds a test for it to make sure it actually works and isn't just crazy. Original commit: elastic/x-pack-elasticsearch@dabf5fdd630858944f9e9b707583b804d3c0a2ed --- .../xpack/security/InternalClient.java | 76 ++++++++++++++++ .../xpack/security/Security.java | 1 - .../authc/esnative/NativeUsersStore.java | 82 +++-------------- .../authz/store/NativeRolesStore.java | 88 ++----------------- .../security/InternalClientIntegTests.java | 50 +++++++++++ 5 files changed, 144 insertions(+), 153 deletions(-) create mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientIntegTests.java diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/InternalClient.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/InternalClient.java index 9eebfab525a..0f630626e73 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/InternalClient.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/InternalClient.java @@ -6,6 +6,12 @@ package org.elasticsearch.xpack.security; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.Action; @@ -13,11 +19,18 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.FilterClient; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.node.Node; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.common.ContextPreservingActionListener; import org.elasticsearch.xpack.security.authc.Authentication; @@ -77,4 +90,67 @@ public class InternalClient extends FilterClient { throw new ElasticsearchException("failed to attach internal user to request", ioe); } } + + /** + * This method fetches all results for the given search request, parses them using the given hit parser and calls the + * listener once done. + */ + public static void fetchAllByEntity(Client client, SearchRequest request, final ActionListener> listener, + Function hitParser) { + final List results = new ArrayList<>(); + if (request.scroll() == null) { // we do scroll by default lets see if we can get rid of this at some point. + request.scroll(TimeValue.timeValueSeconds(10L)); + } + final Consumer clearScroll = (response) -> { + if (response != null && response.getScrollId() != null) { + ClearScrollRequest clearScrollRequest = client.prepareClearScroll().addScrollId(response.getScrollId()).request(); + client.clearScroll(clearScrollRequest, ActionListener.wrap((r) -> {}, (e) -> {})); + } + }; + // This function is MADNESS! But it works, don't think about it too hard... + // simon edit: just watch this if you got this far https://www.youtube.com/watch?v=W-lF106Dgk8 + client.search(request, new ActionListener() { + private volatile SearchResponse lastResponse = null; + + @Override + public void onResponse(SearchResponse resp) { + try { + lastResponse = resp; + if (resp.getHits().getHits().length > 0) { + for (SearchHit hit : resp.getHits().getHits()) { + final T oneResult = hitParser.apply(hit); + if (oneResult != null) { + results.add(oneResult); + } + } + SearchScrollRequest scrollRequest = client.prepareSearchScroll(resp.getScrollId()) + .setScroll(request.scroll().keepAlive()).request(); + client.searchScroll(scrollRequest, this); + + } else { + clearScroll.accept(resp); + // Finally, return the list of users + listener.onResponse(Collections.unmodifiableList(results)); + } + } catch (Exception e){ + onFailure(e); // lets clean up things + } + } + + @Override + public void onFailure(Exception t) { + try { + // attempt to clear the scroll request + clearScroll.accept(lastResponse); + } finally { + if (t instanceof IndexNotFoundException) { + // since this is expected to happen at times, we just call the listener with an empty list + listener.onResponse(Collections.emptyList()); + } else { + listener.onFailure(t); + } + } + } + }); + } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/Security.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/Security.java index f7857a92189..5de2fde245d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -404,7 +404,6 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin { // authentication settings AnonymousUser.addSettings(settingsList); Realms.addSettings(settingsList); - NativeUsersStore.addSettings(settingsList); NativeRolesStore.addSettings(settingsList); AuthenticationService.addSettings(settingsList); AuthorizationService.addSettings(settingsList); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java index ced567299ea..6c19518ab8f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java @@ -22,7 +22,6 @@ import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -33,8 +32,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.gateway.GatewayService; @@ -57,9 +54,7 @@ import org.elasticsearch.xpack.security.user.SystemUser; import org.elasticsearch.xpack.security.user.User; import org.elasticsearch.xpack.security.user.User.Fields; import org.elasticsearch.xpack.security.user.XPackUser; -import org.elasticsearch.xpack.watcher.actions.Action; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -72,7 +67,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import static org.elasticsearch.xpack.security.Security.setting; import static org.elasticsearch.xpack.security.SecurityTemplateService.securityIndexMappingAndTemplateUpToDate; /** @@ -84,12 +78,6 @@ import static org.elasticsearch.xpack.security.SecurityTemplateService.securityI */ public class NativeUsersStore extends AbstractComponent implements ClusterStateListener { - private static final Setting SCROLL_SIZE_SETTING = - Setting.intSetting(setting("authc.native.scroll.size"), 1000, Property.NodeScope); - - private static final Setting SCROLL_KEEP_ALIVE_SETTING = - Setting.timeSetting(setting("authc.native.scroll.keep_alive"), TimeValue.timeValueSeconds(10L), Property.NodeScope); - public enum State { INITIALIZED, STARTING, @@ -106,8 +94,6 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL private final AtomicReference state = new AtomicReference<>(State.INITIALIZED); private final InternalClient client; private final boolean isTribeNode; - private int scrollSize; - private TimeValue scrollKeepAlive; private volatile boolean securityIndexExists = false; @@ -130,9 +116,9 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL } /** - * Retrieve a list of users, if usernames is null or empty, fetch all users + * Retrieve a list of users, if userNames is null or empty, fetch all users */ - public void getUsers(String[] usernames, final ActionListener> listener) { + public void getUsers(String[] userNames, final ActionListener> listener) { if (state() != State.STARTED) { logger.trace("attempted to get users before service was started"); listener.onFailure(new IllegalStateException("users cannot be retrieved as native user service has not been started")); @@ -147,68 +133,33 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL listener.onFailure(t); } }; - if (usernames.length == 1) { // optimization for single user lookup - final String username = usernames[0]; + if (userNames.length == 1) { // optimization for single user lookup + final String username = userNames[0]; getUserAndPassword(username, ActionListener.wrap( (uap) -> listener.onResponse(uap == null ? Collections.emptyList() : Collections.singletonList(uap.user())), handleException::accept)); } else { try { - final List users = new ArrayList<>(); - QueryBuilder query; - if (usernames == null || usernames.length == 0) { + final QueryBuilder query; + if (userNames == null || userNames.length == 0) { query = QueryBuilders.matchAllQuery(); } else { - query = QueryBuilders.boolQuery().filter(QueryBuilders.idsQuery(USER_DOC_TYPE).addIds(usernames)); + query = QueryBuilders.boolQuery().filter(QueryBuilders.idsQuery(USER_DOC_TYPE).addIds(userNames)); } SearchRequest request = client.prepareSearch(SecurityTemplateService.SECURITY_INDEX_NAME) - .setScroll(scrollKeepAlive) + .setScroll(TimeValue.timeValueSeconds(10L)) .setTypes(USER_DOC_TYPE) .setQuery(query) - .setSize(scrollSize) + .setSize(1000) .setFetchSource(true) .request(); request.indicesOptions().ignoreUnavailable(); - - // This function is MADNESS! But it works, don't think about it too hard... - client.search(request, new ActionListener() { - - private SearchResponse lastResponse = null; - - @Override - public void onResponse(final SearchResponse resp) { - lastResponse = resp; - boolean hasHits = resp.getHits().getHits().length > 0; - if (hasHits) { - for (SearchHit hit : resp.getHits().getHits()) { - UserAndPassword u = transformUser(hit.getId(), hit.getSource()); - if (u != null) { - users.add(u.user()); - } - } - SearchScrollRequest scrollRequest = client.prepareSearchScroll(resp.getScrollId()) - .setScroll(scrollKeepAlive).request(); - client.searchScroll(scrollRequest, this); - } else { - if (resp.getScrollId() != null) { - clearScrollResponse(resp.getScrollId()); - } - // Finally, return the list of users - listener.onResponse(Collections.unmodifiableList(users)); - } - } - - @Override - public void onFailure(Exception t) { - // attempt to clear scroll response - if (lastResponse != null && lastResponse.getScrollId() != null) { - clearScrollResponse(lastResponse.getScrollId()); - } - handleException.accept(t); - } + InternalClient.fetchAllByEntity(client, request, listener, (hit) -> { + UserAndPassword u = transformUser(hit.getId(), hit.getSource()); + return u != null ? u.user() : null; }); } catch (Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unable to retrieve users {}", Arrays.toString(usernames)), e); + logger.error((Supplier) () -> new ParameterizedMessage("unable to retrieve users {}", Arrays.toString(userNames)), e); listener.onFailure(e); } } @@ -590,8 +541,6 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL public void start() { try { if (state.compareAndSet(State.INITIALIZED, State.STARTING)) { - this.scrollSize = SCROLL_SIZE_SETTING.get(settings); - this.scrollKeepAlive = SCROLL_KEEP_ALIVE_SETTING.get(settings); state.set(State.STARTED); } } catch (Exception e) { @@ -841,9 +790,4 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL this.enabled = enabled; } } - - public static void addSettings(List> settings) { - settings.add(SCROLL_SIZE_SETTING); - settings.add(SCROLL_KEEP_ALIVE_SETTING); - } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java index 6403dd7d0d2..7ea22d8dec4 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java @@ -16,14 +16,10 @@ import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.ClearScrollRequest; -import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.MultiSearchRequestBuilder; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.MultiSearchResponse.Item; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -45,7 +41,6 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.security.SecurityTemplateService; @@ -58,8 +53,8 @@ import org.elasticsearch.xpack.security.authz.permission.IndicesPermission.Group import org.elasticsearch.xpack.security.authz.permission.Role; import org.elasticsearch.xpack.security.client.SecurityClient; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -95,12 +90,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL FAILED } - private static final Setting SCROLL_SIZE_SETTING = - Setting.intSetting(setting("authz.store.roles.index.scroll.size"), 1000, Property.NodeScope); - - private static final Setting SCROLL_KEEP_ALIVE_SETTING = - Setting.timeSetting(setting("authz.store.roles.index.scroll.keep_alive"), TimeValue.timeValueSeconds(10L), Property.NodeScope); - private static final Setting CACHE_SIZE_SETTING = Setting.intSetting(setting("authz.store.roles.index.cache.max_size"), 10000, Property.NodeScope); private static final Setting CACHE_TTL_SETTING = @@ -126,8 +115,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL } private SecurityClient securityClient; - private int scrollSize; - private TimeValue scrollKeepAlive; // incremented each time the cache is invalidated private final AtomicLong numInvalidation = new AtomicLong(0); @@ -183,8 +170,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL try { if (state.compareAndSet(State.INITIALIZED, State.STARTING)) { this.securityClient = new SecurityClient(client); - this.scrollSize = SCROLL_SIZE_SETTING.get(settings); - this.scrollKeepAlive = SCROLL_KEEP_ALIVE_SETTING.get(settings); state.set(State.STARTED); } } catch (Exception e) { @@ -202,7 +187,7 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL /** * Retrieve a list of roles, if rolesToGet is null or empty, fetch all roles */ - public void getRoleDescriptors(String[] names, final ActionListener> listener) { + public void getRoleDescriptors(String[] names, final ActionListener> listener) { if (state() != State.STARTED) { logger.trace("attempted to get roles before service was started"); listener.onFailure(new IllegalStateException("roles cannot be retrieved as native role service has not been started")); @@ -214,7 +199,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL : Collections.singletonList(roleAndVersion.getRoleDescriptor())), listener::onFailure)); } else { try { - final List roles = new ArrayList<>(); QueryBuilder query; if (names == null || names.length == 0) { query = QueryBuilders.matchAllQuery(); @@ -223,57 +207,13 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL } SearchRequest request = client.prepareSearch(SecurityTemplateService.SECURITY_INDEX_NAME) .setTypes(ROLE_DOC_TYPE) - .setScroll(scrollKeepAlive) + .setScroll(TimeValue.timeValueSeconds(10L)) .setQuery(query) - .setSize(scrollSize) + .setSize(1000) .setFetchSource(true) .request(); request.indicesOptions().ignoreUnavailable(); - - // This function is MADNESS! But it works, don't think about it too hard... - client.search(request, new ActionListener() { - - private SearchResponse lastResponse = null; - - @Override - public void onResponse(SearchResponse resp) { - lastResponse = resp; - boolean hasHits = resp.getHits().getHits().length > 0; - if (hasHits) { - for (SearchHit hit : resp.getHits().getHits()) { - RoleDescriptor rd = transformRole(hit.getId(), hit.getSourceRef(), logger); - if (rd != null) { - roles.add(rd); - } - } - SearchScrollRequest scrollRequest = client.prepareSearchScroll(resp.getScrollId()) - .setScroll(scrollKeepAlive).request(); - client.searchScroll(scrollRequest, this); - } else { - if (resp.getScrollId() != null) { - clearScollRequest(resp.getScrollId()); - } - // Finally, return the list of users - listener.onResponse(Collections.unmodifiableList(roles)); - } - } - - @Override - public void onFailure(Exception t) { - // attempt to clear the scroll request - if (lastResponse != null && lastResponse.getScrollId() != null) { - clearScollRequest(lastResponse.getScrollId()); - } - - if (t instanceof IndexNotFoundException) { - logger.trace("could not retrieve roles because security index does not exist"); - // since this is expected to happen at times, we just call the listener with an empty list - listener.onResponse(Collections.emptyList()); - } else { - listener.onFailure(t); - } - } - }); + InternalClient.fetchAllByEntity(client, request, listener, (hit) -> transformRole(hit.getId(), hit.getSourceRef(), logger)); } catch (Exception e) { logger.error((Supplier) () -> new ParameterizedMessage("unable to retrieve roles {}", Arrays.toString(names)), e); listener.onFailure(e); @@ -523,22 +463,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL } } - private void clearScollRequest(final String scrollId) { - ClearScrollRequest clearScrollRequest = client.prepareClearScroll().addScrollId(scrollId).request(); - client.clearScroll(clearScrollRequest, new ActionListener() { - @Override - public void onResponse(ClearScrollResponse response) { - // cool, it cleared, we don't really care though... - } - - @Override - public void onFailure(Exception t) { - // Not really much to do here except for warn about it... - logger.warn( - (Supplier) () -> new ParameterizedMessage("failed to clear scroll [{}] after retrieving roles", scrollId), t); - } - }); - } // FIXME hack for testing public void reset() { @@ -658,8 +582,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL } public static void addSettings(List> settings) { - settings.add(SCROLL_SIZE_SETTING); - settings.add(SCROLL_KEEP_ALIVE_SETTING); settings.add(CACHE_SIZE_SETTING); settings.add(CACHE_TTL_SETTING); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientIntegTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientIntegTests.java new file mode 100644 index 00000000000..b594c7a4322 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientIntegTests.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.security; + +import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESSingleNodeTestCase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; + +public class InternalClientIntegTests extends ESSingleNodeTestCase { + + public void testFetchAllEntities() throws ExecutionException, InterruptedException { + Client client = client(); + int numDocs = randomIntBetween(5, 30); + for (int i = 0; i < numDocs; i++) { + client.prepareIndex("foo", "bar").setSource(Collections.singletonMap("number", i)).get(); + } + client.admin().indices().prepareRefresh("foo").get(); + SearchRequest request = client.prepareSearch() + .setScroll(TimeValue.timeValueHours(10L)) + .setQuery(QueryBuilders.matchAllQuery()) + .setSize(randomIntBetween(1, 10)) + .setFetchSource(true) + .request(); + request.indicesOptions().ignoreUnavailable(); + PlainActionFuture> future = new PlainActionFuture<>(); + InternalClient.fetchAllByEntity(client(), request, future, (hit) -> Integer.parseInt(hit.sourceAsMap().get("number").toString())); + Collection integers = future.actionGet(); + ArrayList list = new ArrayList<>(integers); + CollectionUtil.timSort(list); + assertEquals(numDocs, list.size()); + for (int i = 0; i < numDocs; i++) { + assertEquals(list.get(i).intValue(), i); + } + } +}