From 9f57afbdf349d159784ccca32b7d33a459ec605a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 26 Oct 2016 17:25:20 +0200 Subject: [PATCH 1/2] Return non-existing role if the .security index is not found (elastic/elasticsearch#3895) We used to be very lenient with all kinds of exceptions related to the `.security` index. Yet, sometimes in tests the index is not yet there but transport clients already pinging the node this causes issues and transport clients disconnect. Now if the index is not present we simply return no role. Original commit: elastic/x-pack-elasticsearch@60948d0c2ac89b291795dbfa52636f9aa7f16393 --- .../xpack/security/authz/store/NativeRolesStore.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java index ac3643eecff..6403dd7d0d2 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java @@ -488,8 +488,14 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL @Override public void onFailure(Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("failed to load role [{}]", roleId), e); - roleActionListener.onFailure(e); + if (e instanceof IndexNotFoundException) { // if the index is not there we just claim the role is not there + logger.warn((Supplier) () -> new ParameterizedMessage("failed to load role [{}] index not available", + roleId), e); + roleActionListener.onResponse(RoleAndVersion.NON_EXISTENT); + } else { + logger.error((Supplier) () -> new ParameterizedMessage("failed to load role [{}]", roleId), e); + roleActionListener.onFailure(e); + } } }); } else { From 84b631643cd50393c55f897b2961a2f61545e566 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 26 Oct 2016 21:05:49 +0200 Subject: [PATCH 2/2] Add utility method to fetch and collect results from a query (elastic/elasticsearch#3894) Today we have the same madness in two places and no dedicated test. This change moves the real madness into a single place and adds a test for it to make sure it actually works and isn't just crazy. Original commit: elastic/x-pack-elasticsearch@dabf5fdd630858944f9e9b707583b804d3c0a2ed --- .../xpack/security/InternalClient.java | 76 ++++++++++++++++ .../xpack/security/Security.java | 1 - .../authc/esnative/NativeUsersStore.java | 82 +++-------------- .../authz/store/NativeRolesStore.java | 88 ++----------------- .../security/InternalClientIntegTests.java | 50 +++++++++++ 5 files changed, 144 insertions(+), 153 deletions(-) create mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientIntegTests.java diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/InternalClient.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/InternalClient.java index 9eebfab525a..0f630626e73 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/InternalClient.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/InternalClient.java @@ -6,6 +6,12 @@ package org.elasticsearch.xpack.security; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.Action; @@ -13,11 +19,18 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.FilterClient; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.node.Node; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.common.ContextPreservingActionListener; import org.elasticsearch.xpack.security.authc.Authentication; @@ -77,4 +90,67 @@ public class InternalClient extends FilterClient { throw new ElasticsearchException("failed to attach internal user to request", ioe); } } + + /** + * This method fetches all results for the given search request, parses them using the given hit parser and calls the + * listener once done. + */ + public static void fetchAllByEntity(Client client, SearchRequest request, final ActionListener> listener, + Function hitParser) { + final List results = new ArrayList<>(); + if (request.scroll() == null) { // we do scroll by default lets see if we can get rid of this at some point. + request.scroll(TimeValue.timeValueSeconds(10L)); + } + final Consumer clearScroll = (response) -> { + if (response != null && response.getScrollId() != null) { + ClearScrollRequest clearScrollRequest = client.prepareClearScroll().addScrollId(response.getScrollId()).request(); + client.clearScroll(clearScrollRequest, ActionListener.wrap((r) -> {}, (e) -> {})); + } + }; + // This function is MADNESS! But it works, don't think about it too hard... + // simon edit: just watch this if you got this far https://www.youtube.com/watch?v=W-lF106Dgk8 + client.search(request, new ActionListener() { + private volatile SearchResponse lastResponse = null; + + @Override + public void onResponse(SearchResponse resp) { + try { + lastResponse = resp; + if (resp.getHits().getHits().length > 0) { + for (SearchHit hit : resp.getHits().getHits()) { + final T oneResult = hitParser.apply(hit); + if (oneResult != null) { + results.add(oneResult); + } + } + SearchScrollRequest scrollRequest = client.prepareSearchScroll(resp.getScrollId()) + .setScroll(request.scroll().keepAlive()).request(); + client.searchScroll(scrollRequest, this); + + } else { + clearScroll.accept(resp); + // Finally, return the list of users + listener.onResponse(Collections.unmodifiableList(results)); + } + } catch (Exception e){ + onFailure(e); // lets clean up things + } + } + + @Override + public void onFailure(Exception t) { + try { + // attempt to clear the scroll request + clearScroll.accept(lastResponse); + } finally { + if (t instanceof IndexNotFoundException) { + // since this is expected to happen at times, we just call the listener with an empty list + listener.onResponse(Collections.emptyList()); + } else { + listener.onFailure(t); + } + } + } + }); + } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/Security.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/Security.java index f7857a92189..5de2fde245d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -404,7 +404,6 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin { // authentication settings AnonymousUser.addSettings(settingsList); Realms.addSettings(settingsList); - NativeUsersStore.addSettings(settingsList); NativeRolesStore.addSettings(settingsList); AuthenticationService.addSettings(settingsList); AuthorizationService.addSettings(settingsList); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java index ced567299ea..6c19518ab8f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java @@ -22,7 +22,6 @@ import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -33,8 +32,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.gateway.GatewayService; @@ -57,9 +54,7 @@ import org.elasticsearch.xpack.security.user.SystemUser; import org.elasticsearch.xpack.security.user.User; import org.elasticsearch.xpack.security.user.User.Fields; import org.elasticsearch.xpack.security.user.XPackUser; -import org.elasticsearch.xpack.watcher.actions.Action; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -72,7 +67,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import static org.elasticsearch.xpack.security.Security.setting; import static org.elasticsearch.xpack.security.SecurityTemplateService.securityIndexMappingAndTemplateUpToDate; /** @@ -84,12 +78,6 @@ import static org.elasticsearch.xpack.security.SecurityTemplateService.securityI */ public class NativeUsersStore extends AbstractComponent implements ClusterStateListener { - private static final Setting SCROLL_SIZE_SETTING = - Setting.intSetting(setting("authc.native.scroll.size"), 1000, Property.NodeScope); - - private static final Setting SCROLL_KEEP_ALIVE_SETTING = - Setting.timeSetting(setting("authc.native.scroll.keep_alive"), TimeValue.timeValueSeconds(10L), Property.NodeScope); - public enum State { INITIALIZED, STARTING, @@ -106,8 +94,6 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL private final AtomicReference state = new AtomicReference<>(State.INITIALIZED); private final InternalClient client; private final boolean isTribeNode; - private int scrollSize; - private TimeValue scrollKeepAlive; private volatile boolean securityIndexExists = false; @@ -130,9 +116,9 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL } /** - * Retrieve a list of users, if usernames is null or empty, fetch all users + * Retrieve a list of users, if userNames is null or empty, fetch all users */ - public void getUsers(String[] usernames, final ActionListener> listener) { + public void getUsers(String[] userNames, final ActionListener> listener) { if (state() != State.STARTED) { logger.trace("attempted to get users before service was started"); listener.onFailure(new IllegalStateException("users cannot be retrieved as native user service has not been started")); @@ -147,68 +133,33 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL listener.onFailure(t); } }; - if (usernames.length == 1) { // optimization for single user lookup - final String username = usernames[0]; + if (userNames.length == 1) { // optimization for single user lookup + final String username = userNames[0]; getUserAndPassword(username, ActionListener.wrap( (uap) -> listener.onResponse(uap == null ? Collections.emptyList() : Collections.singletonList(uap.user())), handleException::accept)); } else { try { - final List users = new ArrayList<>(); - QueryBuilder query; - if (usernames == null || usernames.length == 0) { + final QueryBuilder query; + if (userNames == null || userNames.length == 0) { query = QueryBuilders.matchAllQuery(); } else { - query = QueryBuilders.boolQuery().filter(QueryBuilders.idsQuery(USER_DOC_TYPE).addIds(usernames)); + query = QueryBuilders.boolQuery().filter(QueryBuilders.idsQuery(USER_DOC_TYPE).addIds(userNames)); } SearchRequest request = client.prepareSearch(SecurityTemplateService.SECURITY_INDEX_NAME) - .setScroll(scrollKeepAlive) + .setScroll(TimeValue.timeValueSeconds(10L)) .setTypes(USER_DOC_TYPE) .setQuery(query) - .setSize(scrollSize) + .setSize(1000) .setFetchSource(true) .request(); request.indicesOptions().ignoreUnavailable(); - - // This function is MADNESS! But it works, don't think about it too hard... - client.search(request, new ActionListener() { - - private SearchResponse lastResponse = null; - - @Override - public void onResponse(final SearchResponse resp) { - lastResponse = resp; - boolean hasHits = resp.getHits().getHits().length > 0; - if (hasHits) { - for (SearchHit hit : resp.getHits().getHits()) { - UserAndPassword u = transformUser(hit.getId(), hit.getSource()); - if (u != null) { - users.add(u.user()); - } - } - SearchScrollRequest scrollRequest = client.prepareSearchScroll(resp.getScrollId()) - .setScroll(scrollKeepAlive).request(); - client.searchScroll(scrollRequest, this); - } else { - if (resp.getScrollId() != null) { - clearScrollResponse(resp.getScrollId()); - } - // Finally, return the list of users - listener.onResponse(Collections.unmodifiableList(users)); - } - } - - @Override - public void onFailure(Exception t) { - // attempt to clear scroll response - if (lastResponse != null && lastResponse.getScrollId() != null) { - clearScrollResponse(lastResponse.getScrollId()); - } - handleException.accept(t); - } + InternalClient.fetchAllByEntity(client, request, listener, (hit) -> { + UserAndPassword u = transformUser(hit.getId(), hit.getSource()); + return u != null ? u.user() : null; }); } catch (Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unable to retrieve users {}", Arrays.toString(usernames)), e); + logger.error((Supplier) () -> new ParameterizedMessage("unable to retrieve users {}", Arrays.toString(userNames)), e); listener.onFailure(e); } } @@ -590,8 +541,6 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL public void start() { try { if (state.compareAndSet(State.INITIALIZED, State.STARTING)) { - this.scrollSize = SCROLL_SIZE_SETTING.get(settings); - this.scrollKeepAlive = SCROLL_KEEP_ALIVE_SETTING.get(settings); state.set(State.STARTED); } } catch (Exception e) { @@ -841,9 +790,4 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL this.enabled = enabled; } } - - public static void addSettings(List> settings) { - settings.add(SCROLL_SIZE_SETTING); - settings.add(SCROLL_KEEP_ALIVE_SETTING); - } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java index 6403dd7d0d2..7ea22d8dec4 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java @@ -16,14 +16,10 @@ import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.ClearScrollRequest; -import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.MultiSearchRequestBuilder; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.MultiSearchResponse.Item; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -45,7 +41,6 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.security.SecurityTemplateService; @@ -58,8 +53,8 @@ import org.elasticsearch.xpack.security.authz.permission.IndicesPermission.Group import org.elasticsearch.xpack.security.authz.permission.Role; import org.elasticsearch.xpack.security.client.SecurityClient; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -95,12 +90,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL FAILED } - private static final Setting SCROLL_SIZE_SETTING = - Setting.intSetting(setting("authz.store.roles.index.scroll.size"), 1000, Property.NodeScope); - - private static final Setting SCROLL_KEEP_ALIVE_SETTING = - Setting.timeSetting(setting("authz.store.roles.index.scroll.keep_alive"), TimeValue.timeValueSeconds(10L), Property.NodeScope); - private static final Setting CACHE_SIZE_SETTING = Setting.intSetting(setting("authz.store.roles.index.cache.max_size"), 10000, Property.NodeScope); private static final Setting CACHE_TTL_SETTING = @@ -126,8 +115,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL } private SecurityClient securityClient; - private int scrollSize; - private TimeValue scrollKeepAlive; // incremented each time the cache is invalidated private final AtomicLong numInvalidation = new AtomicLong(0); @@ -183,8 +170,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL try { if (state.compareAndSet(State.INITIALIZED, State.STARTING)) { this.securityClient = new SecurityClient(client); - this.scrollSize = SCROLL_SIZE_SETTING.get(settings); - this.scrollKeepAlive = SCROLL_KEEP_ALIVE_SETTING.get(settings); state.set(State.STARTED); } } catch (Exception e) { @@ -202,7 +187,7 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL /** * Retrieve a list of roles, if rolesToGet is null or empty, fetch all roles */ - public void getRoleDescriptors(String[] names, final ActionListener> listener) { + public void getRoleDescriptors(String[] names, final ActionListener> listener) { if (state() != State.STARTED) { logger.trace("attempted to get roles before service was started"); listener.onFailure(new IllegalStateException("roles cannot be retrieved as native role service has not been started")); @@ -214,7 +199,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL : Collections.singletonList(roleAndVersion.getRoleDescriptor())), listener::onFailure)); } else { try { - final List roles = new ArrayList<>(); QueryBuilder query; if (names == null || names.length == 0) { query = QueryBuilders.matchAllQuery(); @@ -223,57 +207,13 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL } SearchRequest request = client.prepareSearch(SecurityTemplateService.SECURITY_INDEX_NAME) .setTypes(ROLE_DOC_TYPE) - .setScroll(scrollKeepAlive) + .setScroll(TimeValue.timeValueSeconds(10L)) .setQuery(query) - .setSize(scrollSize) + .setSize(1000) .setFetchSource(true) .request(); request.indicesOptions().ignoreUnavailable(); - - // This function is MADNESS! But it works, don't think about it too hard... - client.search(request, new ActionListener() { - - private SearchResponse lastResponse = null; - - @Override - public void onResponse(SearchResponse resp) { - lastResponse = resp; - boolean hasHits = resp.getHits().getHits().length > 0; - if (hasHits) { - for (SearchHit hit : resp.getHits().getHits()) { - RoleDescriptor rd = transformRole(hit.getId(), hit.getSourceRef(), logger); - if (rd != null) { - roles.add(rd); - } - } - SearchScrollRequest scrollRequest = client.prepareSearchScroll(resp.getScrollId()) - .setScroll(scrollKeepAlive).request(); - client.searchScroll(scrollRequest, this); - } else { - if (resp.getScrollId() != null) { - clearScollRequest(resp.getScrollId()); - } - // Finally, return the list of users - listener.onResponse(Collections.unmodifiableList(roles)); - } - } - - @Override - public void onFailure(Exception t) { - // attempt to clear the scroll request - if (lastResponse != null && lastResponse.getScrollId() != null) { - clearScollRequest(lastResponse.getScrollId()); - } - - if (t instanceof IndexNotFoundException) { - logger.trace("could not retrieve roles because security index does not exist"); - // since this is expected to happen at times, we just call the listener with an empty list - listener.onResponse(Collections.emptyList()); - } else { - listener.onFailure(t); - } - } - }); + InternalClient.fetchAllByEntity(client, request, listener, (hit) -> transformRole(hit.getId(), hit.getSourceRef(), logger)); } catch (Exception e) { logger.error((Supplier) () -> new ParameterizedMessage("unable to retrieve roles {}", Arrays.toString(names)), e); listener.onFailure(e); @@ -523,22 +463,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL } } - private void clearScollRequest(final String scrollId) { - ClearScrollRequest clearScrollRequest = client.prepareClearScroll().addScrollId(scrollId).request(); - client.clearScroll(clearScrollRequest, new ActionListener() { - @Override - public void onResponse(ClearScrollResponse response) { - // cool, it cleared, we don't really care though... - } - - @Override - public void onFailure(Exception t) { - // Not really much to do here except for warn about it... - logger.warn( - (Supplier) () -> new ParameterizedMessage("failed to clear scroll [{}] after retrieving roles", scrollId), t); - } - }); - } // FIXME hack for testing public void reset() { @@ -658,8 +582,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL } public static void addSettings(List> settings) { - settings.add(SCROLL_SIZE_SETTING); - settings.add(SCROLL_KEEP_ALIVE_SETTING); settings.add(CACHE_SIZE_SETTING); settings.add(CACHE_TTL_SETTING); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientIntegTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientIntegTests.java new file mode 100644 index 00000000000..b594c7a4322 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientIntegTests.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.security; + +import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESSingleNodeTestCase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; + +public class InternalClientIntegTests extends ESSingleNodeTestCase { + + public void testFetchAllEntities() throws ExecutionException, InterruptedException { + Client client = client(); + int numDocs = randomIntBetween(5, 30); + for (int i = 0; i < numDocs; i++) { + client.prepareIndex("foo", "bar").setSource(Collections.singletonMap("number", i)).get(); + } + client.admin().indices().prepareRefresh("foo").get(); + SearchRequest request = client.prepareSearch() + .setScroll(TimeValue.timeValueHours(10L)) + .setQuery(QueryBuilders.matchAllQuery()) + .setSize(randomIntBetween(1, 10)) + .setFetchSource(true) + .request(); + request.indicesOptions().ignoreUnavailable(); + PlainActionFuture> future = new PlainActionFuture<>(); + InternalClient.fetchAllByEntity(client(), request, future, (hit) -> Integer.parseInt(hit.sourceAsMap().get("number").toString())); + Collection integers = future.actionGet(); + ArrayList list = new ArrayList<>(integers); + CollectionUtil.timSort(list); + assertEquals(numDocs, list.size()); + for (int i = 0; i < numDocs; i++) { + assertEquals(list.get(i).intValue(), i); + } + } +}