Add utility method to fetch and collect results from a query (elastic/elasticsearch#3894)

Today we have the same madness in two places and no dedicated test. This
change moves the real madness into a single place and adds a test for it
to make sure it actually works and isn't just crazy.

Original commit: elastic/x-pack-elasticsearch@dabf5fdd63
This commit is contained in:
Simon Willnauer 2016-10-26 21:05:49 +02:00 committed by GitHub
parent 163e5feb6e
commit 84b631643c
5 changed files with 144 additions and 153 deletions

View File

@ -6,6 +6,12 @@
package org.elasticsearch.xpack.security;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.Action;
@ -13,11 +19,18 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.node.Node;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.common.ContextPreservingActionListener;
import org.elasticsearch.xpack.security.authc.Authentication;
@ -77,4 +90,67 @@ public class InternalClient extends FilterClient {
throw new ElasticsearchException("failed to attach internal user to request", ioe);
}
}
/**
* This method fetches all results for the given search request, parses them using the given hit parser and calls the
* listener once done.
*/
public static <T> void fetchAllByEntity(Client client, SearchRequest request, final ActionListener<Collection<T>> listener,
Function<SearchHit, T> hitParser) {
final List<T> results = new ArrayList<>();
if (request.scroll() == null) { // we do scroll by default lets see if we can get rid of this at some point.
request.scroll(TimeValue.timeValueSeconds(10L));
}
final Consumer<SearchResponse> clearScroll = (response) -> {
if (response != null && response.getScrollId() != null) {
ClearScrollRequest clearScrollRequest = client.prepareClearScroll().addScrollId(response.getScrollId()).request();
client.clearScroll(clearScrollRequest, ActionListener.wrap((r) -> {}, (e) -> {}));
}
};
// This function is MADNESS! But it works, don't think about it too hard...
// simon edit: just watch this if you got this far https://www.youtube.com/watch?v=W-lF106Dgk8
client.search(request, new ActionListener<SearchResponse>() {
private volatile SearchResponse lastResponse = null;
@Override
public void onResponse(SearchResponse resp) {
try {
lastResponse = resp;
if (resp.getHits().getHits().length > 0) {
for (SearchHit hit : resp.getHits().getHits()) {
final T oneResult = hitParser.apply(hit);
if (oneResult != null) {
results.add(oneResult);
}
}
SearchScrollRequest scrollRequest = client.prepareSearchScroll(resp.getScrollId())
.setScroll(request.scroll().keepAlive()).request();
client.searchScroll(scrollRequest, this);
} else {
clearScroll.accept(resp);
// Finally, return the list of users
listener.onResponse(Collections.unmodifiableList(results));
}
} catch (Exception e){
onFailure(e); // lets clean up things
}
}
@Override
public void onFailure(Exception t) {
try {
// attempt to clear the scroll request
clearScroll.accept(lastResponse);
} finally {
if (t instanceof IndexNotFoundException) {
// since this is expected to happen at times, we just call the listener with an empty list
listener.onResponse(Collections.<T>emptyList());
} else {
listener.onFailure(t);
}
}
}
});
}
}

View File

@ -404,7 +404,6 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin {
// authentication settings
AnonymousUser.addSettings(settingsList);
Realms.addSettings(settingsList);
NativeUsersStore.addSettings(settingsList);
NativeRolesStore.addSettings(settingsList);
AuthenticationService.addSettings(settingsList);
AuthorizationService.addSettings(settingsList);

View File

@ -22,7 +22,6 @@ import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -33,8 +32,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayService;
@ -57,9 +54,7 @@ import org.elasticsearch.xpack.security.user.SystemUser;
import org.elasticsearch.xpack.security.user.User;
import org.elasticsearch.xpack.security.user.User.Fields;
import org.elasticsearch.xpack.security.user.XPackUser;
import org.elasticsearch.xpack.watcher.actions.Action;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -72,7 +67,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.security.Security.setting;
import static org.elasticsearch.xpack.security.SecurityTemplateService.securityIndexMappingAndTemplateUpToDate;
/**
@ -84,12 +78,6 @@ import static org.elasticsearch.xpack.security.SecurityTemplateService.securityI
*/
public class NativeUsersStore extends AbstractComponent implements ClusterStateListener {
private static final Setting<Integer> SCROLL_SIZE_SETTING =
Setting.intSetting(setting("authc.native.scroll.size"), 1000, Property.NodeScope);
private static final Setting<TimeValue> SCROLL_KEEP_ALIVE_SETTING =
Setting.timeSetting(setting("authc.native.scroll.keep_alive"), TimeValue.timeValueSeconds(10L), Property.NodeScope);
public enum State {
INITIALIZED,
STARTING,
@ -106,8 +94,6 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
private final InternalClient client;
private final boolean isTribeNode;
private int scrollSize;
private TimeValue scrollKeepAlive;
private volatile boolean securityIndexExists = false;
@ -130,9 +116,9 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
}
/**
* Retrieve a list of users, if usernames is null or empty, fetch all users
* Retrieve a list of users, if userNames is null or empty, fetch all users
*/
public void getUsers(String[] usernames, final ActionListener<Collection<User>> listener) {
public void getUsers(String[] userNames, final ActionListener<Collection<User>> listener) {
if (state() != State.STARTED) {
logger.trace("attempted to get users before service was started");
listener.onFailure(new IllegalStateException("users cannot be retrieved as native user service has not been started"));
@ -147,68 +133,33 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
listener.onFailure(t);
}
};
if (usernames.length == 1) { // optimization for single user lookup
final String username = usernames[0];
if (userNames.length == 1) { // optimization for single user lookup
final String username = userNames[0];
getUserAndPassword(username, ActionListener.wrap(
(uap) -> listener.onResponse(uap == null ? Collections.emptyList() : Collections.singletonList(uap.user())),
handleException::accept));
} else {
try {
final List<User> users = new ArrayList<>();
QueryBuilder query;
if (usernames == null || usernames.length == 0) {
final QueryBuilder query;
if (userNames == null || userNames.length == 0) {
query = QueryBuilders.matchAllQuery();
} else {
query = QueryBuilders.boolQuery().filter(QueryBuilders.idsQuery(USER_DOC_TYPE).addIds(usernames));
query = QueryBuilders.boolQuery().filter(QueryBuilders.idsQuery(USER_DOC_TYPE).addIds(userNames));
}
SearchRequest request = client.prepareSearch(SecurityTemplateService.SECURITY_INDEX_NAME)
.setScroll(scrollKeepAlive)
.setScroll(TimeValue.timeValueSeconds(10L))
.setTypes(USER_DOC_TYPE)
.setQuery(query)
.setSize(scrollSize)
.setSize(1000)
.setFetchSource(true)
.request();
request.indicesOptions().ignoreUnavailable();
// This function is MADNESS! But it works, don't think about it too hard...
client.search(request, new ActionListener<SearchResponse>() {
private SearchResponse lastResponse = null;
@Override
public void onResponse(final SearchResponse resp) {
lastResponse = resp;
boolean hasHits = resp.getHits().getHits().length > 0;
if (hasHits) {
for (SearchHit hit : resp.getHits().getHits()) {
InternalClient.fetchAllByEntity(client, request, listener, (hit) -> {
UserAndPassword u = transformUser(hit.getId(), hit.getSource());
if (u != null) {
users.add(u.user());
}
}
SearchScrollRequest scrollRequest = client.prepareSearchScroll(resp.getScrollId())
.setScroll(scrollKeepAlive).request();
client.searchScroll(scrollRequest, this);
} else {
if (resp.getScrollId() != null) {
clearScrollResponse(resp.getScrollId());
}
// Finally, return the list of users
listener.onResponse(Collections.unmodifiableList(users));
}
}
@Override
public void onFailure(Exception t) {
// attempt to clear scroll response
if (lastResponse != null && lastResponse.getScrollId() != null) {
clearScrollResponse(lastResponse.getScrollId());
}
handleException.accept(t);
}
return u != null ? u.user() : null;
});
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("unable to retrieve users {}", Arrays.toString(usernames)), e);
logger.error((Supplier<?>) () -> new ParameterizedMessage("unable to retrieve users {}", Arrays.toString(userNames)), e);
listener.onFailure(e);
}
}
@ -590,8 +541,6 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
public void start() {
try {
if (state.compareAndSet(State.INITIALIZED, State.STARTING)) {
this.scrollSize = SCROLL_SIZE_SETTING.get(settings);
this.scrollKeepAlive = SCROLL_KEEP_ALIVE_SETTING.get(settings);
state.set(State.STARTED);
}
} catch (Exception e) {
@ -841,9 +790,4 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
this.enabled = enabled;
}
}
public static void addSettings(List<Setting<?>> settings) {
settings.add(SCROLL_SIZE_SETTING);
settings.add(SCROLL_KEEP_ALIVE_SETTING);
}
}

View File

@ -16,14 +16,10 @@ import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.MultiSearchResponse.Item;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
@ -45,7 +41,6 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.security.SecurityTemplateService;
@ -58,8 +53,8 @@ import org.elasticsearch.xpack.security.authz.permission.IndicesPermission.Group
import org.elasticsearch.xpack.security.authz.permission.Role;
import org.elasticsearch.xpack.security.client.SecurityClient;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -95,12 +90,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
FAILED
}
private static final Setting<Integer> SCROLL_SIZE_SETTING =
Setting.intSetting(setting("authz.store.roles.index.scroll.size"), 1000, Property.NodeScope);
private static final Setting<TimeValue> SCROLL_KEEP_ALIVE_SETTING =
Setting.timeSetting(setting("authz.store.roles.index.scroll.keep_alive"), TimeValue.timeValueSeconds(10L), Property.NodeScope);
private static final Setting<Integer> CACHE_SIZE_SETTING =
Setting.intSetting(setting("authz.store.roles.index.cache.max_size"), 10000, Property.NodeScope);
private static final Setting<TimeValue> CACHE_TTL_SETTING =
@ -126,8 +115,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
}
private SecurityClient securityClient;
private int scrollSize;
private TimeValue scrollKeepAlive;
// incremented each time the cache is invalidated
private final AtomicLong numInvalidation = new AtomicLong(0);
@ -183,8 +170,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
try {
if (state.compareAndSet(State.INITIALIZED, State.STARTING)) {
this.securityClient = new SecurityClient(client);
this.scrollSize = SCROLL_SIZE_SETTING.get(settings);
this.scrollKeepAlive = SCROLL_KEEP_ALIVE_SETTING.get(settings);
state.set(State.STARTED);
}
} catch (Exception e) {
@ -202,7 +187,7 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
/**
* Retrieve a list of roles, if rolesToGet is null or empty, fetch all roles
*/
public void getRoleDescriptors(String[] names, final ActionListener<List<RoleDescriptor>> listener) {
public void getRoleDescriptors(String[] names, final ActionListener<Collection<RoleDescriptor>> listener) {
if (state() != State.STARTED) {
logger.trace("attempted to get roles before service was started");
listener.onFailure(new IllegalStateException("roles cannot be retrieved as native role service has not been started"));
@ -214,7 +199,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
: Collections.singletonList(roleAndVersion.getRoleDescriptor())), listener::onFailure));
} else {
try {
final List<RoleDescriptor> roles = new ArrayList<>();
QueryBuilder query;
if (names == null || names.length == 0) {
query = QueryBuilders.matchAllQuery();
@ -223,57 +207,13 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
}
SearchRequest request = client.prepareSearch(SecurityTemplateService.SECURITY_INDEX_NAME)
.setTypes(ROLE_DOC_TYPE)
.setScroll(scrollKeepAlive)
.setScroll(TimeValue.timeValueSeconds(10L))
.setQuery(query)
.setSize(scrollSize)
.setSize(1000)
.setFetchSource(true)
.request();
request.indicesOptions().ignoreUnavailable();
// This function is MADNESS! But it works, don't think about it too hard...
client.search(request, new ActionListener<SearchResponse>() {
private SearchResponse lastResponse = null;
@Override
public void onResponse(SearchResponse resp) {
lastResponse = resp;
boolean hasHits = resp.getHits().getHits().length > 0;
if (hasHits) {
for (SearchHit hit : resp.getHits().getHits()) {
RoleDescriptor rd = transformRole(hit.getId(), hit.getSourceRef(), logger);
if (rd != null) {
roles.add(rd);
}
}
SearchScrollRequest scrollRequest = client.prepareSearchScroll(resp.getScrollId())
.setScroll(scrollKeepAlive).request();
client.searchScroll(scrollRequest, this);
} else {
if (resp.getScrollId() != null) {
clearScollRequest(resp.getScrollId());
}
// Finally, return the list of users
listener.onResponse(Collections.unmodifiableList(roles));
}
}
@Override
public void onFailure(Exception t) {
// attempt to clear the scroll request
if (lastResponse != null && lastResponse.getScrollId() != null) {
clearScollRequest(lastResponse.getScrollId());
}
if (t instanceof IndexNotFoundException) {
logger.trace("could not retrieve roles because security index does not exist");
// since this is expected to happen at times, we just call the listener with an empty list
listener.onResponse(Collections.<RoleDescriptor>emptyList());
} else {
listener.onFailure(t);
}
}
});
InternalClient.fetchAllByEntity(client, request, listener, (hit) -> transformRole(hit.getId(), hit.getSourceRef(), logger));
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("unable to retrieve roles {}", Arrays.toString(names)), e);
listener.onFailure(e);
@ -523,22 +463,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
}
}
private void clearScollRequest(final String scrollId) {
ClearScrollRequest clearScrollRequest = client.prepareClearScroll().addScrollId(scrollId).request();
client.clearScroll(clearScrollRequest, new ActionListener<ClearScrollResponse>() {
@Override
public void onResponse(ClearScrollResponse response) {
// cool, it cleared, we don't really care though...
}
@Override
public void onFailure(Exception t) {
// Not really much to do here except for warn about it...
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage("failed to clear scroll [{}] after retrieving roles", scrollId), t);
}
});
}
// FIXME hack for testing
public void reset() {
@ -658,8 +582,6 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
}
public static void addSettings(List<Setting<?>> settings) {
settings.add(SCROLL_SIZE_SETTING);
settings.add(SCROLL_KEEP_ALIVE_SETTING);
settings.add(CACHE_SIZE_SETTING);
settings.add(CACHE_TTL_SETTING);
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
public class InternalClientIntegTests extends ESSingleNodeTestCase {
public void testFetchAllEntities() throws ExecutionException, InterruptedException {
Client client = client();
int numDocs = randomIntBetween(5, 30);
for (int i = 0; i < numDocs; i++) {
client.prepareIndex("foo", "bar").setSource(Collections.singletonMap("number", i)).get();
}
client.admin().indices().prepareRefresh("foo").get();
SearchRequest request = client.prepareSearch()
.setScroll(TimeValue.timeValueHours(10L))
.setQuery(QueryBuilders.matchAllQuery())
.setSize(randomIntBetween(1, 10))
.setFetchSource(true)
.request();
request.indicesOptions().ignoreUnavailable();
PlainActionFuture<Collection<Integer>> future = new PlainActionFuture<>();
InternalClient.fetchAllByEntity(client(), request, future, (hit) -> Integer.parseInt(hit.sourceAsMap().get("number").toString()));
Collection<Integer> integers = future.actionGet();
ArrayList<Integer> list = new ArrayList<>(integers);
CollectionUtil.timSort(list);
assertEquals(numDocs, list.size());
for (int i = 0; i < numDocs; i++) {
assertEquals(list.get(i).intValue(), i);
}
}
}