Remove all blocking calls from TransportGetUsersAction (elastic/elasticsearch#3876)

`TransportGetUsersAction` does some funky blocking calls even though
it's specifying `SAME` as the thread-pool indicating that it's fast or
forking off quickly. Both might not be true today. This change adds
async support to the methods it calls without breaking the existing
Realm interface. Yet, we might need to do this down the road.

Original commit: elastic/x-pack-elasticsearch@d0959f87f3
This commit is contained in:
Simon Willnauer 2016-10-25 22:11:19 +02:00 committed by GitHub
parent 542a484031
commit 0b24f022f7
10 changed files with 241 additions and 234 deletions

View File

@ -8,11 +8,11 @@ package org.elasticsearch.xpack.common;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.xpack.security.support.Exceptions;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -50,7 +50,7 @@ public final class GroupedActionListener<T> implements ActionListener<T> {
delegate.onFailure(failure.get());
} else {
List<T> collect = this.roles.asList().stream().map((e)
-> e.value).filter(r -> r != null).collect(Collectors.toList());
-> e.value).filter(Objects::nonNull).collect(Collectors.toList());
collect.addAll(defaults);
delegate.onResponse(Collections.unmodifiableList(collect));
}
@ -66,4 +66,4 @@ public final class GroupedActionListener<T> implements ActionListener<T> {
delegate.onFailure(failure.get());
}
}
}
}

View File

@ -15,6 +15,8 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.common.GroupedActionListener;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.security.authc.esnative.NativeUsersStore;
import org.elasticsearch.xpack.security.authc.esnative.ReservedRealm;
import org.elasticsearch.xpack.security.user.SystemUser;
@ -22,7 +24,11 @@ import org.elasticsearch.xpack.security.user.User;
import org.elasticsearch.xpack.security.user.XPackUser;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.elasticsearch.common.Strings.arrayToDelimitedString;
@ -47,15 +53,11 @@ public class TransportGetUsersAction extends HandledTransportAction<GetUsersRequ
final boolean specificUsersRequested = requestedUsers != null && requestedUsers.length > 0;
final List<String> usersToSearchFor = new ArrayList<>();
final List<User> users = new ArrayList<>();
final List<String> realmLookup = new ArrayList<>();
if (specificUsersRequested) {
for (String username : requestedUsers) {
if (ReservedRealm.isReserved(username, settings)) {
User user = reservedRealm.lookupUser(username);
// a user could be null if the service isn't ready or we requested the anonymous user and it is not enabled
if (user != null) {
users.add(user);
}
realmLookup.add(username);
} else if (SystemUser.NAME.equals(username) || XPackUser.NAME.equals(username)) {
listener.onFailure(new IllegalArgumentException("user [" + username + "] is internal"));
return;
@ -63,46 +65,39 @@ public class TransportGetUsersAction extends HandledTransportAction<GetUsersRequ
usersToSearchFor.add(username);
}
}
} else {
users.addAll(reservedRealm.users());
}
if (usersToSearchFor.size() == 1) {
final String username = usersToSearchFor.get(0);
// We can fetch a single user with a get, much cheaper:
usersStore.getUser(username, new ActionListener<User>() {
@Override
public void onResponse(User user) {
if (user != null) {
users.add(user);
}
listener.onResponse(new GetUsersResponse(users));
}
@Override
public void onFailure(Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to retrieve user [{}]", username), e);
listener.onFailure(e);
}
});
} else if (specificUsersRequested && usersToSearchFor.isEmpty()) {
listener.onResponse(new GetUsersResponse(users));
final ActionListener<Collection<Collection<User>>> sendingListener = ActionListener.wrap((userLists) -> {
users.addAll(userLists.stream().flatMap(Collection::stream).filter(Objects::nonNull).collect(Collectors.toList()));
listener.onResponse(new GetUsersResponse(users));
}, listener::onFailure);
final GroupedActionListener<Collection<User>> groupListener =
new GroupedActionListener<>(sendingListener, 2, Collections.emptyList());
// We have two sources for the users object, the reservedRealm and the usersStore, we query both at the same time with a
// GroupedActionListener
if (realmLookup.isEmpty()) {
if (specificUsersRequested == false) {
// we get all users from the realm
reservedRealm.users(groupListener);
} else {
groupListener.onResponse(Collections.emptyList());// pass an empty list to inform the group listener
// - no real lookups necessary
}
} else {
usersStore.getUsers(usersToSearchFor.toArray(new String[usersToSearchFor.size()]), new ActionListener<List<User>>() {
@Override
public void onResponse(List<User> usersFound) {
users.addAll(usersFound);
listener.onResponse(new GetUsersResponse(users));
}
@Override
public void onFailure(Exception e) {
logger.error(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to retrieve user [{}]", arrayToDelimitedString(request.usernames(), ",")), e);
listener.onFailure(e);
}
});
// nested group listener action here - for each of the users we got and fetch it concurrently - once we are done we notify
// the "global" group listener.
GroupedActionListener<User> realmGroupListener = new GroupedActionListener<>(groupListener, realmLookup.size(),
Collections.emptyList());
for (String user : realmLookup) {
reservedRealm.lookupUser(user, realmGroupListener);
}
}
// user store lookups
if (specificUsersRequested && usersToSearchFor.isEmpty()) {
groupListener.onResponse(Collections.emptyList()); // no users requested notify
} else {
// go and get all users from the users store and pass it directly on to the group listener
usersStore.getUsers(usersToSearchFor.toArray(new String[usersToSearchFor.size()]), groupListener);
}
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.security.authc;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.xpack.security.user.User;
@ -87,6 +88,10 @@ public abstract class Realm implements Comparable<Realm> {
*/
public abstract User lookupUser(String username);
public void lookupUser(String username, ActionListener<User> listener) {
listener.onResponse(lookupUser(username));
}
public Map<String, Object> usageStats() {
Map<String, Object> stats = new HashMap<>();
stats.put("name", name());

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.security.authc.esnative;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.security.authc.RealmConfig;
import org.elasticsearch.xpack.security.authc.support.CachingUsernamePasswordRealm;
import org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken;
@ -33,7 +34,13 @@ public class NativeRealm extends CachingUsernamePasswordRealm {
protected User doLookupUser(String username) {
return userStore.getUser(username);
}
@Override
protected void doLookupUser(String username, ActionListener<User> listener) {
userStore.getUsers(new String[] {username}, ActionListener.wrap(c -> listener.onResponse(c.stream().findAny().orElse(null)),
listener::onFailure));
}
@Override
protected User doAuthenticate(UsernamePasswordToken token) {
return userStore.verifyPassword(token.principal(), token.credentials());

View File

@ -57,9 +57,11 @@ import org.elasticsearch.xpack.security.user.SystemUser;
import org.elasticsearch.xpack.security.user.User;
import org.elasticsearch.xpack.security.user.User.Fields;
import org.elasticsearch.xpack.security.user.XPackUser;
import org.elasticsearch.xpack.watcher.actions.Action;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -68,6 +70,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.security.Security.setting;
import static org.elasticsearch.xpack.security.SecurityTemplateService.securityIndexMappingAndTemplateUpToDate;
@ -126,110 +129,88 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
return uap == null ? null : uap.user();
}
/**
* Retrieve a single user, calling the listener when retrieved
*/
public void getUser(String username, final ActionListener<User> listener) {
if (state() != State.STARTED) {
logger.trace("attempted to get user [{}] before service was started", username);
listener.onFailure(new IllegalStateException("user cannot be retrieved as native user service has not been started"));
return;
}
getUserAndPassword(username, new ActionListener<UserAndPassword>() {
@Override
public void onResponse(UserAndPassword uap) {
listener.onResponse(uap == null ? null : uap.user());
}
@Override
public void onFailure(Exception t) {
if (t instanceof IndexNotFoundException) {
logger.trace("failed to retrieve user [{}] since security index does not exist", username);
// We don't invoke the onFailure listener here, instead
// we call the response with a null user
listener.onResponse(null);
} else {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("failed to retrieve user [{}]", username), t);
listener.onFailure(t);
}
}
});
}
/**
* Retrieve a list of users, if usernames is null or empty, fetch all users
*/
public void getUsers(String[] usernames, final ActionListener<List<User>> listener) {
public void getUsers(String[] usernames, final ActionListener<Collection<User>> listener) {
if (state() != State.STARTED) {
logger.trace("attempted to get users before service was started");
listener.onFailure(new IllegalStateException("users cannot be retrieved as native user service has not been started"));
return;
}
try {
final List<User> users = new ArrayList<>();
QueryBuilder query;
if (usernames == null || usernames.length == 0) {
query = QueryBuilders.matchAllQuery();
final Consumer<Exception> handleException = (t) -> {
if (t instanceof IndexNotFoundException) {
logger.trace("could not retrieve users because security index does not exist");
// We don't invoke the onFailure listener here, instead just pass an empty list
listener.onResponse(Collections.emptyList());
} else {
query = QueryBuilders.boolQuery().filter(QueryBuilders.idsQuery(USER_DOC_TYPE).addIds(usernames));
listener.onFailure(t);
}
SearchRequest request = client.prepareSearch(SecurityTemplateService.SECURITY_INDEX_NAME)
.setScroll(scrollKeepAlive)
.setTypes(USER_DOC_TYPE)
.setQuery(query)
.setSize(scrollSize)
.setFetchSource(true)
.request();
request.indicesOptions().ignoreUnavailable();
};
if (usernames.length == 1) { // optimization for single user lookup
final String username = usernames[0];
getUserAndPassword(username, ActionListener.wrap(
(uap) -> listener.onResponse(uap == null ? Collections.emptyList() : Collections.singletonList(uap.user())),
handleException::accept));
} else {
try {
final List<User> users = new ArrayList<>();
QueryBuilder query;
if (usernames == null || usernames.length == 0) {
query = QueryBuilders.matchAllQuery();
} else {
query = QueryBuilders.boolQuery().filter(QueryBuilders.idsQuery(USER_DOC_TYPE).addIds(usernames));
}
SearchRequest request = client.prepareSearch(SecurityTemplateService.SECURITY_INDEX_NAME)
.setScroll(scrollKeepAlive)
.setTypes(USER_DOC_TYPE)
.setQuery(query)
.setSize(scrollSize)
.setFetchSource(true)
.request();
request.indicesOptions().ignoreUnavailable();
// This function is MADNESS! But it works, don't think about it too hard...
client.search(request, new ActionListener<SearchResponse>() {
// This function is MADNESS! But it works, don't think about it too hard...
client.search(request, new ActionListener<SearchResponse>() {
private SearchResponse lastResponse = null;
private SearchResponse lastResponse = null;
@Override
public void onResponse(final SearchResponse resp) {
lastResponse = resp;
boolean hasHits = resp.getHits().getHits().length > 0;
if (hasHits) {
for (SearchHit hit : resp.getHits().getHits()) {
UserAndPassword u = transformUser(hit.getId(), hit.getSource());
if (u != null) {
users.add(u.user());
@Override
public void onResponse(final SearchResponse resp) {
lastResponse = resp;
boolean hasHits = resp.getHits().getHits().length > 0;
if (hasHits) {
for (SearchHit hit : resp.getHits().getHits()) {
UserAndPassword u = transformUser(hit.getId(), hit.getSource());
if (u != null) {
users.add(u.user());
}
}
SearchScrollRequest scrollRequest = client.prepareSearchScroll(resp.getScrollId())
.setScroll(scrollKeepAlive).request();
client.searchScroll(scrollRequest, this);
} else {
if (resp.getScrollId() != null) {
clearScrollResponse(resp.getScrollId());
}
// Finally, return the list of users
listener.onResponse(Collections.unmodifiableList(users));
}
SearchScrollRequest scrollRequest = client.prepareSearchScroll(resp.getScrollId())
.setScroll(scrollKeepAlive).request();
client.searchScroll(scrollRequest, this);
} else {
if (resp.getScrollId() != null) {
clearScrollResponse(resp.getScrollId());
}
@Override
public void onFailure(Exception t) {
// attempt to clear scroll response
if (lastResponse != null && lastResponse.getScrollId() != null) {
clearScrollResponse(lastResponse.getScrollId());
}
// Finally, return the list of users
listener.onResponse(Collections.unmodifiableList(users));
handleException.accept(t);
}
}
@Override
public void onFailure(Exception t) {
// attempt to clear scroll response
if (lastResponse != null && lastResponse.getScrollId() != null) {
clearScrollResponse(lastResponse.getScrollId());
}
if (t instanceof IndexNotFoundException) {
logger.trace("could not retrieve users because security index does not exist");
// We don't invoke the onFailure listener here, instead just pass an empty list
listener.onResponse(Collections.emptyList());
} else {
listener.onFailure(t);
}
}
});
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("unable to retrieve users {}", Arrays.toString(usernames)), e);
listener.onFailure(e);
});
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("unable to retrieve users {}", Arrays.toString(usernames)), e);
listener.onFailure(e);
}
}
}
@ -661,6 +642,7 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
final AtomicReference<ReservedUserInfo> userInfoRef = new AtomicReference<>();
final AtomicReference<Exception> failure = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
client.prepareGet(SecurityTemplateService.SECURITY_INDEX_NAME, RESERVED_USER_DOC_TYPE, username)
.execute(new LatchedActionListener<>(new ActionListener<GetResponse>() {
@Override
@ -710,18 +692,16 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
return userInfoRef.get();
}
Map<String, ReservedUserInfo> getAllReservedUserInfo() throws Exception {
void getAllReservedUserInfo(ActionListener<Map<String, ReservedUserInfo>> listener) {
assert started();
final Map<String, ReservedUserInfo> userInfos = new HashMap<>();
final AtomicReference<Exception> failure = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
client.prepareSearch(SecurityTemplateService.SECURITY_INDEX_NAME)
.setTypes(RESERVED_USER_DOC_TYPE)
.setQuery(QueryBuilders.matchAllQuery())
.setFetchSource(true)
.execute(new LatchedActionListener<>(new ActionListener<SearchResponse>() {
.execute(new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
Map<String, ReservedUserInfo> userInfos = new HashMap<>();
assert searchResponse.getHits().getTotalHits() <= 10 : "there are more than 10 reserved users we need to change " +
"this to retrieve them all!";
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
@ -729,43 +709,29 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
String password = (String) sourceMap.get(User.Fields.PASSWORD.getPreferredName());
Boolean enabled = (Boolean) sourceMap.get(Fields.ENABLED.getPreferredName());
if (password == null || password.isEmpty()) {
failure.set(new IllegalStateException("password hash must not be empty!"));
listener.onFailure(new IllegalStateException("password hash must not be empty!"));
break;
} else if (enabled == null) {
failure.set(new IllegalStateException("enabled must not be null!"));
listener.onFailure(new IllegalStateException("enabled must not be null!"));
break;
} else {
userInfos.put(searchHit.getId(), new ReservedUserInfo(password.toCharArray(), enabled));
}
}
listener.onResponse(userInfos);
}
@Override
public void onFailure(Exception e) {
if (e instanceof IndexNotFoundException) {
logger.trace("could not retrieve built in users since security index does not exist", e);
listener.onResponse(Collections.emptyMap());
} else {
logger.error("failed to retrieve built in users", e);
failure.set(e);
listener.onFailure(e);
}
}
}, latch));
try {
final boolean responseReceived = latch.await(30, TimeUnit.SECONDS);
if (responseReceived == false) {
failure.set(new TimeoutException("timed out trying to get built in users"));
}
} catch (InterruptedException e) {
failure.set(e);
}
Exception failureCause = failure.get();
if (failureCause != null) {
// if there is any sort of failure we need to throw an exception to prevent the fallback to the default password...
throw failureCause;
}
return userInfos;
});
}
private void clearScrollResponse(String scrollId) {

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.security.authc.esnative;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.XPackSettings;
@ -133,27 +134,26 @@ public class ReservedRealm extends CachingUsernamePasswordRealm {
}
}
public Collection<User> users() {
public void users(ActionListener<Collection<User>> listener) {
if (nativeUsersStore.started() == false || enabled == false) {
return anonymousEnabled ? Collections.singletonList(anonymousUser) : Collections.emptyList();
listener.onResponse(anonymousEnabled ? Collections.singletonList(anonymousUser) : Collections.emptyList());
} else {
nativeUsersStore.getAllReservedUserInfo(ActionListener.wrap((reservedUserInfos) -> {
List<User> users = new ArrayList<>(3);
ReservedUserInfo userInfo = reservedUserInfos.get(ElasticUser.NAME);
users.add(new ElasticUser(userInfo == null || userInfo.enabled));
userInfo = reservedUserInfos.get(KibanaUser.NAME);
users.add(new KibanaUser(userInfo == null || userInfo.enabled));
if (anonymousEnabled) {
users.add(anonymousUser);
}
listener.onResponse(users);
}, (e) -> {
logger.error("failed to retrieve reserved users", e);
listener.onResponse(anonymousEnabled ? Collections.singletonList(anonymousUser) : Collections.emptyList());
}));
}
List<User> users = new ArrayList<>(3);
try {
Map<String, ReservedUserInfo> reservedUserInfos = nativeUsersStore.getAllReservedUserInfo();
ReservedUserInfo userInfo = reservedUserInfos.get(ElasticUser.NAME);
users.add(new ElasticUser(userInfo == null || userInfo.enabled));
userInfo = reservedUserInfos.get(KibanaUser.NAME);
users.add(new KibanaUser(userInfo == null || userInfo.enabled));
if (anonymousEnabled) {
users.add(anonymousUser);
}
} catch (Exception e) {
logger.error("failed to retrieve reserved users", e);
return anonymousEnabled ? Collections.singletonList(anonymousUser) : Collections.emptyList();
}
return users;
}
private ReservedUserInfo getUserInfo(final String username) {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.security.authc.support;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.cache.CacheLoader;
@ -140,6 +141,7 @@ public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm
}
}
@Override
public final User lookupUser(final String username) {
if (!userLookupSupported()) {
@ -184,8 +186,36 @@ public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm
protected abstract User doAuthenticate(UsernamePasswordToken token);
@Override
public void lookupUser(String username, ActionListener<User> listener) {
if (!userLookupSupported()) {
listener.onResponse(null);
} else {
UserWithHash withHash = cache.get(username);
if (withHash == null) {
doLookupUser(username, ActionListener.wrap((user) -> {
try {
if (user != null) {
UserWithHash userWithHash = new UserWithHash(user, null, null);
cache.computeIfAbsent(username, (n) -> userWithHash);
}
listener.onResponse(user);
} catch (ExecutionException e) {
listener.onFailure(e);
}
}, listener::onFailure));
} else {
listener.onResponse(withHash.user);
}
}
}
protected abstract User doLookupUser(String username);
protected void doLookupUser(String username, ActionListener<User> listener) {
listener.onResponse(doLookupUser(username));
}
private static class UserWithHash {
User user;
char[] hash;

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.security.action.user;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.ValidationException;
@ -15,6 +16,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.security.authc.esnative.NativeUsersStore;
import org.elasticsearch.xpack.security.authc.esnative.ReservedRealm;
import org.elasticsearch.xpack.security.authc.esnative.ReservedRealmTests;
import org.elasticsearch.xpack.security.user.AnonymousUser;
import org.elasticsearch.xpack.security.user.SystemUser;
import org.elasticsearch.xpack.security.user.User;
@ -138,8 +140,11 @@ public class TransportGetUsersActionTests extends ESTestCase {
public void testReservedUsersOnly() {
NativeUsersStore usersStore = mock(NativeUsersStore.class);
when(usersStore.started()).thenReturn(true);
ReservedRealmTests.mockGetAllReservedUserInfo(usersStore, Collections.emptyMap());
ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), settings, usersStore, new AnonymousUser(settings));
final Collection<User> allReservedUsers = reservedRealm.users();
PlainActionFuture<Collection<User>> userFuture = new PlainActionFuture<>();
reservedRealm.users(userFuture);
final Collection<User> allReservedUsers = userFuture.actionGet();
final int size = randomIntBetween(1, allReservedUsers.size());
final List<User> reservedUsers = randomSubsetOf(size, allReservedUsers);
final List<String> names = reservedUsers.stream().map(User::principal).collect(Collectors.toList());
@ -176,6 +181,7 @@ public class TransportGetUsersActionTests extends ESTestCase {
Arrays.asList(new User("jane"), new User("fred")), randomUsers());
NativeUsersStore usersStore = mock(NativeUsersStore.class);
when(usersStore.started()).thenReturn(true);
ReservedRealmTests.mockGetAllReservedUserInfo(usersStore, Collections.emptyMap());
ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), settings, usersStore, new AnonymousUser(settings));
TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);
@ -208,7 +214,9 @@ public class TransportGetUsersActionTests extends ESTestCase {
});
final List<User> expectedList = new ArrayList<>();
expectedList.addAll(reservedRealm.users());
PlainActionFuture<Collection<User>> userFuture = new PlainActionFuture<>();
reservedRealm.users(userFuture);
expectedList.addAll(userFuture.actionGet());
expectedList.addAll(storeUsers);
assertThat(throwableRef.get(), is(nullValue()));
@ -229,28 +237,13 @@ public class TransportGetUsersActionTests extends ESTestCase {
GetUsersRequest request = new GetUsersRequest();
request.usernames(storeUsernames);
if (storeUsernames.length > 1) {
doAnswer(new Answer() {
public Void answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
assert args.length == 2;
ActionListener<List<User>> listener = (ActionListener<List<User>>) args[1];
listener.onResponse(storeUsers);
return null;
}
}).when(usersStore).getUsers(aryEq(storeUsernames), any(ActionListener.class));
} else {
assertThat(storeUsernames.length, is(1));
doAnswer(new Answer() {
public Void answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
assert args.length == 2;
ActionListener<User> listener = (ActionListener<User>) args[1];
listener.onResponse(storeUsers.get(0));
return null;
}
}).when(usersStore).getUser(eq(storeUsernames[0]), any(ActionListener.class));
}
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 2;
ActionListener<List<User>> listener = (ActionListener<List<User>>) args[1];
listener.onResponse(storeUsers);
return null;
}).when(usersStore).getUsers(aryEq(storeUsernames), any(ActionListener.class));
final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
final AtomicReference<GetUsersResponse> responseRef = new AtomicReference<>();
@ -275,7 +268,7 @@ public class TransportGetUsersActionTests extends ESTestCase {
if (storeUsers.size() > 1) {
verify(usersStore, times(1)).getUsers(aryEq(storeUsernames), any(ActionListener.class));
} else {
verify(usersStore, times(1)).getUser(eq(storeUsernames[0]), any(ActionListener.class));
verify(usersStore, times(1)).getUsers(aryEq(new String[] {storeUsernames[0]}), any(ActionListener.class));
}
}
@ -292,28 +285,13 @@ public class TransportGetUsersActionTests extends ESTestCase {
GetUsersRequest request = new GetUsersRequest();
request.usernames(storeUsernames);
if (storeUsernames.length > 1) {
doAnswer(new Answer() {
public Void answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
assert args.length == 2;
ActionListener<List<User>> listener = (ActionListener<List<User>>) args[1];
listener.onFailure(e);
return null;
}
}).when(usersStore).getUsers(aryEq(storeUsernames), any(ActionListener.class));
} else {
assertThat(storeUsernames.length, is(1));
doAnswer(new Answer() {
public Void answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
assert args.length == 2;
ActionListener<User> listener = (ActionListener<User>) args[1];
listener.onFailure(e);
return null;
}
}).when(usersStore).getUser(eq(storeUsernames[0]), any(ActionListener.class));
}
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 2;
ActionListener<List<User>> listener = (ActionListener<List<User>>) args[1];
listener.onFailure(e);
return null;
}).when(usersStore).getUsers(aryEq(storeUsernames), any(ActionListener.class));
final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
final AtomicReference<GetUsersResponse> responseRef = new AtomicReference<>();
@ -332,11 +310,7 @@ public class TransportGetUsersActionTests extends ESTestCase {
assertThat(throwableRef.get(), is(notNullValue()));
assertThat(throwableRef.get(), is(sameInstance(e)));
assertThat(responseRef.get(), is(nullValue()));
if (request.usernames().length == 1) {
verify(usersStore, times(1)).getUser(eq(request.usernames()[0]), any(ActionListener.class));
} else {
verify(usersStore, times(1)).getUsers(aryEq(storeUsernames), any(ActionListener.class));
}
verify(usersStore, times(1)).getUsers(aryEq(storeUsernames), any(ActionListener.class));
}
private List<User> randomUsers() {

View File

@ -8,12 +8,14 @@ package org.elasticsearch.xpack.security.action.user;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.security.authc.esnative.NativeUsersStore;
import org.elasticsearch.xpack.security.authc.esnative.ReservedRealm;
import org.elasticsearch.xpack.security.authc.esnative.ReservedRealmTests;
import org.elasticsearch.xpack.security.authc.support.Hasher;
import org.elasticsearch.xpack.security.authc.support.SecuredString;
import org.elasticsearch.xpack.security.user.AnonymousUser;
@ -26,6 +28,8 @@ import org.elasticsearch.xpack.security.user.XPackUser;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.containsString;
@ -110,9 +114,12 @@ public class TransportPutUserActionTests extends ESTestCase {
public void testReservedUser() {
NativeUsersStore usersStore = mock(NativeUsersStore.class);
when(usersStore.started()).thenReturn(true);
ReservedRealmTests.mockGetAllReservedUserInfo(usersStore, Collections.emptyMap());
Settings settings = Settings.builder().put("path.home", createTempDir()).build();
ReservedRealm reservedRealm = new ReservedRealm(new Environment(settings), settings, usersStore, new AnonymousUser(settings));
final User reserved = randomFrom(reservedRealm.users().toArray(new User[0]));
PlainActionFuture<Collection<User>> userFuture = new PlainActionFuture<>();
reservedRealm.users(userFuture);
final User reserved = randomFrom(userFuture.actionGet().toArray(new User[0]));
TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);
TransportPutUserAction action = new TransportPutUserAction(Settings.EMPTY, mock(ThreadPool.class), mock(ActionFilters.class),

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.security.authc.esnative;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.XPackSettings;
@ -20,12 +22,18 @@ import org.elasticsearch.xpack.security.user.User;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -45,6 +53,7 @@ public class ReservedRealmTests extends ESTestCase {
public void setupMocks() {
usersStore = mock(NativeUsersStore.class);
when(usersStore.started()).thenReturn(true);
mockGetAllReservedUserInfo(usersStore, Collections.emptyMap());
}
public void testUserStoreNotStarted() {
@ -190,7 +199,9 @@ public class ReservedRealmTests extends ESTestCase {
public void testGetUsers() {
final ReservedRealm reservedRealm =
new ReservedRealm(mock(Environment.class), Settings.EMPTY, usersStore, new AnonymousUser(Settings.EMPTY));
assertThat(reservedRealm.users(), containsInAnyOrder(new ElasticUser(true), new KibanaUser(true)));
PlainActionFuture<Collection<User>> userFuture = new PlainActionFuture<>();
reservedRealm.users(userFuture);
assertThat(userFuture.actionGet(), containsInAnyOrder(new ElasticUser(true), new KibanaUser(true)));
}
public void testGetUsersDisabled() {
@ -201,10 +212,12 @@ public class ReservedRealmTests extends ESTestCase {
.build();
final AnonymousUser anonymousUser = new AnonymousUser(settings);
final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), settings, usersStore, anonymousUser);
PlainActionFuture<Collection<User>> userFuture = new PlainActionFuture<>();
reservedRealm.users(userFuture);
if (anonymousEnabled) {
assertThat(reservedRealm.users(), contains(anonymousUser));
assertThat(userFuture.actionGet(), contains(anonymousUser));
} else {
assertThat(reservedRealm.users(), empty());
assertThat(userFuture.actionGet(), empty());
}
}
@ -225,4 +238,14 @@ public class ReservedRealmTests extends ESTestCase {
assertThat(e.getMessage(), containsString("failed to authenticate"));
}
}
/*
* NativeUserStore#getAllReservedUserInfo is pkg private we can't mock it otherwise
*/
public static void mockGetAllReservedUserInfo(NativeUsersStore usersStore, Map<String, ReservedUserInfo> collection) {
doAnswer((i) -> {
((ActionListener) i.getArguments()[0]).onResponse(collection);
return null;
}).when(usersStore).getAllReservedUserInfo(any(ActionListener.class));
}
}