From a50bc7946bd52032c4cd4c566bce8e834cf9a80d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 25 Oct 2016 17:28:29 +0200 Subject: [PATCH] Make request authorization non-blocking (elastic/elasticsearch#3837) This change removes the blocking notion from fetching the roles from a remote index. This also removes the blocking client calls that can potentially deadlock a request if executed on the transport thread. Relates to elastic/elasticsearch#3790 Original commit: elastic/x-pack-elasticsearch@c2eda3904394878911b2819924696986a82f771a --- .../xpack/common/GroupedActionListener.java | 69 ++++++ .../xpack/security/SecurityFeatureSet.java | 10 +- .../action/filter/SecurityActionFilter.java | 118 +++++------ .../security/authz/AuthorizationService.java | 32 +-- .../security/authz/AuthorizationUtils.java | 63 ++++++ .../authz/store/CompositeRolesStore.java | 23 +- .../security/authz/store/FileRolesStore.java | 4 +- .../authz/store/NativeRolesStore.java | 139 ++++++------ .../authz/store/ReservedRolesStore.java | 4 +- .../security/authz/store/RolesStore.java | 20 -- .../SecurityServerTransportInterceptor.java | 74 +++++-- .../transport/ServerTransportFilter.java | 25 ++- .../integration/ClearRolesCacheTests.java | 6 +- .../common/GroupedActionListenerTests.java | 107 ++++++++++ .../xpack/security/InternalClientTests.java | 3 - .../filter/SecurityActionFilterTests.java | 43 +++- .../authz/AuthorizationServiceTests.java | 199 ++++++++++-------- .../authz/IndicesAndAliasesResolverTests.java | 34 ++- .../transport/ServerTransportFilterTests.java | 36 +++- .../transport/TransportFilterTests.java | 40 +++- 20 files changed, 721 insertions(+), 328 deletions(-) create mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/common/GroupedActionListener.java delete mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/RolesStore.java create mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/common/GroupedActionListenerTests.java diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/common/GroupedActionListener.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/common/GroupedActionListener.java new file mode 100644 index 00000000000..f77ebf103ca --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/common/GroupedActionListener.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.common; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.xpack.security.support.Exceptions; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * An action listener that delegates it's results to another listener once + * it has received one or more failures or N results. This allows synchronous + * tasks to be forked off in a loop with the same listener and respond to a higher level listener once all tasks responded. + */ +public final class GroupedActionListener implements ActionListener { + private final CountDown countDown; + private final AtomicInteger pos = new AtomicInteger(); + private final AtomicArray roles; + private final ActionListener> delegate; + private final Collection defaults; + private final AtomicReference failure = new AtomicReference<>(); + + /** + * Creates a new listener + * @param delegate the delegate listener + * @param groupSize the group size + */ + public GroupedActionListener(ActionListener> delegate, int groupSize, Collection defaults) { + roles = new AtomicArray<>(groupSize); + countDown = new CountDown(groupSize); + this.delegate = delegate; + this.defaults = defaults; + } + + @Override + public void onResponse(T element) { + roles.set(pos.incrementAndGet() - 1, element); + if (countDown.countDown()) { + if (failure.get() != null) { + delegate.onFailure(failure.get()); + } else { + List collect = this.roles.asList().stream().map((e) + -> e.value).filter(r -> r != null).collect(Collectors.toList()); + collect.addAll(defaults); + delegate.onResponse(Collections.unmodifiableList(collect)); + } + } + } + + @Override + public void onFailure(Exception e) { + if (failure.compareAndSet(null, e) == false) { + failure.get().addSuppressed(e); + } + if (countDown.countDown()) { + delegate.onFailure(failure.get()); + } + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java index 61a21d2f84f..aa593e91237 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java @@ -23,7 +23,6 @@ import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.security.audit.AuditTrailService; import org.elasticsearch.xpack.security.authc.Realms; import org.elasticsearch.xpack.security.authz.store.CompositeRolesStore; -import org.elasticsearch.xpack.security.authz.store.RolesStore; import org.elasticsearch.xpack.security.crypto.CryptoService; import org.elasticsearch.xpack.security.transport.filter.IPFilter; import org.elasticsearch.xpack.security.user.AnonymousUser; @@ -89,7 +88,7 @@ public class SecurityFeatureSet implements XPackFeatureSet { @Override public XPackFeatureSet.Usage usage() { Map realmsUsage = buildRealmsUsage(realms); - Map rolesStoreUsage = rolesStoreUsage(rolesStore); + Map rolesStoreUsage = rolesStore == null ? Collections.emptyMap() : rolesStore.usageStats(); Map sslUsage = sslUsage(settings); Map auditUsage = auditUsage(auditTrailService); Map ipFilterUsage = ipFilterUsage(ipFilter); @@ -106,13 +105,6 @@ public class SecurityFeatureSet implements XPackFeatureSet { return realms.usageStats(); } - static Map rolesStoreUsage(@Nullable RolesStore rolesStore) { - if (rolesStore == null) { - return Collections.emptyMap(); - } - return rolesStore.usageStats(); - } - static Map sslUsage(Settings settings) { Map map = new HashMap<>(2); map.put("http", Collections.singletonMap("enabled", HTTP_SSL_ENABLED.get(settings))); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java index aa03f3ceb0e..2a1825f92f5 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.security.action.filter; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -19,7 +20,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilterChain; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -29,6 +29,7 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.common.ContextPreservingActionListener; import org.elasticsearch.xpack.security.SecurityContext; import org.elasticsearch.xpack.security.action.SecurityActionMapper; import org.elasticsearch.xpack.security.action.interceptor.RequestInterceptor; @@ -98,13 +99,33 @@ public class SecurityActionFilter extends AbstractComponent implements ActionFil final boolean restoreOriginalContext = securityContext.getAuthentication() != null; try { if (licenseState.isAuthAllowed()) { - if (AuthorizationUtils.shouldReplaceUserWithSystem(threadContext, action)) { + final boolean useSystemUser = AuthorizationUtils.shouldReplaceUserWithSystem(threadContext, action); + // we should always restore the original here because we forcefully changed to the system user + final ThreadContext.StoredContext toRestore = restoreOriginalContext || useSystemUser ? original : () -> {}; + final ActionListener signingListener = new ContextPreservingActionListener<>(toRestore, + ActionListener.wrap(r -> { + try { + listener.onResponse(sign(r)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, listener::onFailure)); + ActionListener authenticatedListener = new ActionListener() { + @Override + public void onResponse(Void aVoid) { + chain.proceed(task, action, request, signingListener); + } + @Override + public void onFailure(Exception e) { + signingListener.onFailure(e); + } + }; + if (useSystemUser) { try (ThreadContext.StoredContext ctx = threadContext.stashContext()) { - applyInternal(task, action, request, new SigningListener(this, listener, original), chain); + applyInternal(action, request, authenticatedListener); } } else { - applyInternal(task, action, request, - new SigningListener(this, listener, restoreOriginalContext ? original : null), chain); + applyInternal(action, request, authenticatedListener); } } else if (SECURITY_ACTION_MATCHER.test(action)) { throw LicenseUtils.newComplianceException(XPackPlugin.SECURITY); @@ -126,7 +147,7 @@ public class SecurityActionFilter extends AbstractComponent implements ActionFil return Integer.MIN_VALUE; } - private void applyInternal(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) + private void applyInternal(String action, final ActionRequest request, ActionListener listener) throws IOException { /** here we fallback on the system user. Internal system requests are requests that are triggered by @@ -141,35 +162,34 @@ public class SecurityActionFilter extends AbstractComponent implements ActionFil final String securityAction = actionMapper.action(action, request); Authentication authentication = authcService.authenticate(securityAction, request, SystemUser.INSTANCE); assert authentication != null; - authzService.authorize(authentication, securityAction, request); - final User user = authentication.getUser(); - request = unsign(user, securityAction, request); + final AuthorizationUtils.AsyncAuthorizer asyncAuthorizer = new AuthorizationUtils.AsyncAuthorizer(authentication, listener, + (userRoles, runAsRoles) -> { + authzService.authorize(authentication, securityAction, request, userRoles, runAsRoles); + final User user = authentication.getUser(); + unsign(user, securityAction, request); + + /* + * We use a separate concept for code that needs to be run after authentication and authorization that could effect the + * running of the action. This is done to make it more clear of the state of the request. + */ + for (RequestInterceptor interceptor : requestInterceptors) { + if (interceptor.supports(request)) { + interceptor.intercept(request, user); + } + } + listener.onResponse(null); + }); + asyncAuthorizer.authorize(authzService); - /* - * We use a separate concept for code that needs to be run after authentication and authorization that could effect the running of - * the action. This is done to make it more clear of the state of the request. - */ - for (RequestInterceptor interceptor : requestInterceptors) { - if (interceptor.supports(request)) { - interceptor.intercept(request, user); - } - } - // we should always restore the original here because we forcefully changed to the system user - chain.proceed(task, action, request, listener); } - Request unsign(User user, String action, Request request) { - + ActionRequest unsign(User user, String action, final ActionRequest request) { try { - if (request instanceof SearchScrollRequest) { SearchScrollRequest scrollRequest = (SearchScrollRequest) request; String scrollId = scrollRequest.scrollId(); scrollRequest.scrollId(cryptoService.unsignAndVerify(scrollId)); - return request; - } - - if (request instanceof ClearScrollRequest) { + } else if (request instanceof ClearScrollRequest) { ClearScrollRequest clearScrollRequest = (ClearScrollRequest) request; boolean isClearAllScrollRequest = clearScrollRequest.scrollIds().contains("_all"); if (!isClearAllScrollRequest) { @@ -180,64 +200,22 @@ public class SecurityActionFilter extends AbstractComponent implements ActionFil } clearScrollRequest.scrollIds(unsignedIds); } - return request; } - - return request; - } catch (IllegalArgumentException | IllegalStateException e) { auditTrail.tamperedRequest(user, action, request); throw authorizationError("invalid request. {}", e.getMessage()); } + return request; } Response sign(Response response) throws IOException { - if (response instanceof SearchResponse) { SearchResponse searchResponse = (SearchResponse) response; String scrollId = searchResponse.getScrollId(); if (scrollId != null && !cryptoService.isSigned(scrollId)) { searchResponse.scrollId(cryptoService.sign(scrollId)); } - return response; } - return response; } - - static class SigningListener implements ActionListener { - - private final SecurityActionFilter filter; - private final ActionListener innerListener; - private final ThreadContext.StoredContext threadContext; - - private SigningListener(SecurityActionFilter filter, ActionListener innerListener, - @Nullable ThreadContext.StoredContext threadContext) { - this.filter = filter; - this.innerListener = innerListener; - this.threadContext = threadContext; - } - - @Override - @SuppressWarnings("unchecked") - public void onResponse(Response response) { - if (threadContext != null) { - threadContext.restore(); - } - try { - response = this.filter.sign(response); - innerListener.onResponse(response); - } catch (IOException e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - if (threadContext != null) { - threadContext.restore(); - } - innerListener.onFailure(e); - } - } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index d108eed5aa3..3896b37612e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.security.authz; import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.alias.Alias; @@ -30,6 +31,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.xpack.common.GroupedActionListener; import org.elasticsearch.xpack.security.SecurityTemplateService; import org.elasticsearch.xpack.security.audit.AuditTrailService; import org.elasticsearch.xpack.security.authc.Authentication; @@ -49,7 +51,6 @@ import org.elasticsearch.xpack.security.user.SystemUser; import org.elasticsearch.xpack.security.user.User; import org.elasticsearch.xpack.security.user.XPackUser; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -105,7 +106,8 @@ public class AuthorizationService extends AbstractComponent { * @param request The request * @throws ElasticsearchSecurityException If the given user is no allowed to execute the given request */ - public void authorize(Authentication authentication, String action, TransportRequest request) throws ElasticsearchSecurityException { + public void authorize(Authentication authentication, String action, TransportRequest request, Collection userRoles, + Collection runAsRoles) throws ElasticsearchSecurityException { final TransportRequest originalRequest = request; if (request instanceof ConcreteShardRequest) { request = ((ConcreteShardRequest) request).getRequest(); @@ -122,9 +124,8 @@ public class AuthorizationService extends AbstractComponent { } throw denial(authentication, action, request); } - + Collection roles = userRoles; // get the roles of the authenticated user, which may be different than the effective - Collection roles = roles(authentication.getUser()); GlobalPermission permission = permission(roles); final boolean isRunAs = authentication.getUser() != authentication.getRunAsUser(); @@ -143,7 +144,7 @@ public class AuthorizationService extends AbstractComponent { RunAsPermission runAs = permission.runAs(); if (runAs != null && runAs.check(authentication.getRunAsUser().principal())) { grantRunAs(authentication, action, request); - roles = roles(authentication.getRunAsUser()); + roles = runAsRoles; permission = permission(roles); // permission can be empty as it might be that the run as user's role is unknown if (permission.isEmpty()) { @@ -280,7 +281,7 @@ public class AuthorizationService extends AbstractComponent { return rolesBuilder.build(); } - Collection roles(User user) { + public void roles(User user, ActionListener> roleActionListener) { // we need to special case the internal users in this method, if we apply the anonymous roles to every user including these system // user accounts then we run into the chance of a deadlock because then we need to get a role that we may be trying to get as the // internal user. The SystemUser is special cased as it has special privileges to execute internal actions and should never be @@ -291,7 +292,8 @@ public class AuthorizationService extends AbstractComponent { } if (XPackUser.is(user)) { assert XPackUser.INSTANCE.roles().length == 1 && SuperuserRole.NAME.equals(XPackUser.INSTANCE.roles()[0]); - return Collections.singleton(SuperuserRole.INSTANCE); + roleActionListener.onResponse(Collections.singleton(SuperuserRole.INSTANCE)); + return; } Set roleNames = new HashSet<>(); @@ -302,15 +304,17 @@ public class AuthorizationService extends AbstractComponent { } Collections.addAll(roleNames, anonymousUser.roles()); } - List roles = new ArrayList<>(); - roles.add(DefaultRole.INSTANCE); - for (String roleName : roleNames) { - Role role = rolesStore.role(roleName); - if (role != null) { - roles.add(role); + + final Collection defaultRoles = Collections.singletonList(DefaultRole.INSTANCE); + if (roleNames.isEmpty()) { + roleActionListener.onResponse(defaultRoles); + } else { + final GroupedActionListener listener = new GroupedActionListener<>(roleActionListener, roleNames.size(), + defaultRoles); + for (String roleName : roleNames) { + rolesStore.roles(roleName, listener); } } - return Collections.unmodifiableList(roles); } private static boolean isCompositeAction(String action) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java index 30e763a88f8..9d126f1121d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java @@ -5,12 +5,18 @@ */ package org.elasticsearch.xpack.security.authz; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.xpack.security.authc.Authentication; +import org.elasticsearch.xpack.security.authz.permission.Role; import org.elasticsearch.xpack.security.user.SystemUser; import org.elasticsearch.xpack.security.support.AutomatonPredicate; import org.elasticsearch.xpack.security.support.Automatons; +import java.util.Collection; +import java.util.Collections; +import java.util.function.BiConsumer; import java.util.function.Predicate; public final class AuthorizationUtils { @@ -59,4 +65,61 @@ public final class AuthorizationUtils { public static boolean isInternalAction(String action) { return INTERNAL_PREDICATE.test(action); } + + /** + * A base class to authorize authorize a given {@link Authentication} against it's users or run-as users roles. + * This class fetches the roles for the users asynchronously and then authenticates the in the callback. + */ + public static class AsyncAuthorizer { + + private final ActionListener listener; + private final BiConsumer, Collection> consumer; + private final Authentication authentication; + private volatile Collection userRoles; + private volatile Collection runAsRoles; + private CountDown countDown = new CountDown(2); // we expect only two responses!! + + public AsyncAuthorizer(Authentication authentication, ActionListener listener, BiConsumer, + Collection> consumer) { + this.consumer = consumer; + this.listener = listener; + this.authentication = authentication; + } + + public void authorize(AuthorizationService service) { + if (SystemUser.is(authentication.getUser())) { + setUserRoles(Collections.emptyList()); // we can inform the listener immediately - nothing to fetch for us on system user + setRunAsRoles(Collections.emptyList()); + } else { + service.roles(authentication.getUser(), ActionListener.wrap(this::setUserRoles, listener::onFailure)); + if (authentication.getUser().equals(authentication.getRunAsUser()) == false) { + assert authentication.getRunAsUser() != null : "runAs user is null but shouldn't"; + service.roles(authentication.getRunAsUser(), ActionListener.wrap(this::setRunAsRoles, listener::onFailure)); + } else { + setRunAsRoles(Collections.emptyList()); + } + } + } + + private void setUserRoles(Collection roles) { + this.userRoles = roles; + maybeRun(); + } + + private void setRunAsRoles(Collection roles) { + this.runAsRoles = roles; + maybeRun(); + } + + private void maybeRun() { + if (countDown.countDown()) { + try { + consumer.accept(userRoles, runAsRoles); + } catch (Exception e) { + listener.onFailure(e); + } + } + } + + } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java index 03c8a278891..2cc09de0722 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.security.authz.store; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.security.authz.permission.Role; @@ -16,7 +17,7 @@ import java.util.Map; * A composite roles store that combines built in roles, file-based roles, and index-based roles. Checks the built in roles first, then the * file roles, and finally the index roles. */ -public class CompositeRolesStore extends AbstractComponent implements RolesStore { +public class CompositeRolesStore extends AbstractComponent { private final FileRolesStore fileRolesStore; private final NativeRolesStore nativeRolesStore; @@ -30,7 +31,7 @@ public class CompositeRolesStore extends AbstractComponent implements RolesStore this.reservedRolesStore = reservedRolesStore; } - public Role role(String role) { + private Role getBuildInRole(String role) { // builtins first Role builtIn = reservedRolesStore.role(role); if (builtIn != null) { @@ -44,15 +45,19 @@ public class CompositeRolesStore extends AbstractComponent implements RolesStore logger.trace("loaded role [{}] from file roles store", role); return fileRole; } - - Role nativeRole = nativeRolesStore.role(role); - if (nativeRole != null) { - logger.trace("loaded role [{}] from native roles store", role); - } - return nativeRole; + return null; } - @Override + public void roles(String role, ActionListener roleActionListener) { + Role storedRole = getBuildInRole(role); + if (storedRole == null) { + nativeRolesStore.role(role, roleActionListener); + } else { + roleActionListener.onResponse(storedRole); + } + } + + public Map usageStats() { Map usage = new HashMap<>(2); usage.put("file", fileRolesStore.usageStats()); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/FileRolesStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/FileRolesStore.java index 136a40b35a0..bdcef269cdd 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/FileRolesStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/FileRolesStore.java @@ -43,7 +43,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableMap; -public class FileRolesStore extends AbstractLifecycleComponent implements RolesStore { +public class FileRolesStore extends AbstractLifecycleComponent { private static final Pattern IN_SEGMENT_LINE = Pattern.compile("^\\s+.+"); private static final Pattern SKIP_LINE = Pattern.compile("(^#.*|^\\s*)"); @@ -86,12 +86,10 @@ public class FileRolesStore extends AbstractLifecycleComponent implements RolesS protected void doClose() throws ElasticsearchException { } - @Override public Role role(String role) { return permissions.get(role); } - @Override public Map usageStats() { Map usageStats = new HashMap<>(); usageStats.put("size", permissions.size()); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java index de8be2db93d..5816024982b 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; @@ -25,6 +24,7 @@ import org.elasticsearch.action.search.MultiSearchResponse.Item; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -46,6 +46,7 @@ import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.security.SecurityTemplateService; import org.elasticsearch.xpack.security.action.role.ClearRolesCacheRequest; @@ -63,9 +64,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -83,7 +83,7 @@ import static org.elasticsearch.xpack.security.SecurityTemplateService.securityI * * No caching is done by this class, it is handled at a higher level */ -public class NativeRolesStore extends AbstractComponent implements RolesStore, ClusterStateListener { +public class NativeRolesStore extends AbstractComponent implements ClusterStateListener { public static final Setting SCROLL_SIZE_SETTING = Setting.intSetting(setting("authz.store.roles.index.scroll.size"), 1000, Property.NodeScope); @@ -127,6 +127,8 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C private SecurityClient securityClient; private int scrollSize; private TimeValue scrollKeepAlive; + // incremented each time the cache is invalidated + private final AtomicLong numInvalidation = new AtomicLong(0); private volatile boolean securityIndexExists = false; @@ -277,8 +279,17 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C logger.trace("attempted to get role [{}] before service was started", role); listener.onResponse(null); } - RoleAndVersion roleAndVersion = getRoleAndVersion(role); - listener.onResponse(roleAndVersion == null ? null : roleAndVersion.getRoleDescriptor()); + getRoleAndVersion(role, new ActionListener() { + @Override + public void onResponse(RoleAndVersion roleAndVersion) { + listener.onResponse(roleAndVersion == null ? null : roleAndVersion.getRoleDescriptor()); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } public void deleteRole(final DeleteRoleRequest deleteRoleRequest, final ActionListener listener) { @@ -352,16 +363,25 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C } - @Override - public Role role(String roleName) { + public void role(String roleName, ActionListener listener) { if (state() != State.STARTED) { - return null; + listener.onResponse(null); + } else { + getRoleAndVersion(roleName, new ActionListener() { + @Override + public void onResponse(RoleAndVersion roleAndVersion) { + listener.onResponse(roleAndVersion == null ? null : roleAndVersion.getRole()); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + + } + }); } - RoleAndVersion roleAndVersion = getRoleAndVersion(roleName); - return roleAndVersion == null ? null : roleAndVersion.getRole(); } - @Override public Map usageStats() { if (state() != State.STARTED) { return Collections.emptyMap(); @@ -445,70 +465,61 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C return usageStats; } - private RoleAndVersion getRoleAndVersion(final String roleId) { + private void getRoleAndVersion(final String roleId, ActionListener roleActionListener) { if (securityIndexExists == false) { - return null; - } - - RoleAndVersion roleAndVersion = null; - final AtomicReference getRef = new AtomicReference<>(null); - final CountDownLatch latch = new CountDownLatch(1); - try { - roleAndVersion = roleCache.computeIfAbsent(roleId, (key) -> { - logger.debug("attempting to load role [{}] from index", key); - executeGetRoleRequest(roleId, new LatchedActionListener<>(new ActionListener() { + roleActionListener.onResponse(null); + } else { + RoleAndVersion cachedRoleAndVersion = roleCache.get(roleId); + if (cachedRoleAndVersion == null) { + final long invalidationCounter = numInvalidation.get(); + executeGetRoleRequest(roleId, new ActionListener() { @Override - public void onResponse(GetResponse role) { - getRef.set(role); - } - - @Override - public void onFailure(Exception t) { - if (t instanceof IndexNotFoundException) { - logger.trace( - (Supplier) () -> new ParameterizedMessage( - "failed to retrieve role [{}] since security index does not exist", roleId), t); - } else { - logger.error((Supplier) () -> new ParameterizedMessage("failed to retrieve role [{}]", roleId), t); + public void onResponse(GetResponse response) { + RoleDescriptor descriptor = transformRole(response); + RoleAndVersion roleAndVersion = null; + if (descriptor != null) { + logger.debug("loaded role [{}] from index with version [{}]", roleId, response.getVersion()); + RoleAndVersion fetchedRoleAndVersion = new RoleAndVersion(descriptor, response.getVersion()); + roleAndVersion = fetchedRoleAndVersion; + if (fetchedRoleAndVersion != null) { + /* this is kinda spooky. We use a read/write lock to ensure we don't modify the cache if we hold the write + * lock (fetching stats for instance - which is kinda overkill?) but since we fetching stuff in an async + * fashion we need to make sure that if the cacht got invalidated since we started the request we don't + * put a potential stale result in the cache, hence the numInvalidation.get() comparison to the number of + * invalidation when we started. we just try to be on the safe side and don't cache potentially stale + * results*/ + try (final ReleasableLock ignored = readLock.acquire()) { + if (invalidationCounter == numInvalidation.get()) { + roleCache.computeIfAbsent(roleId, (k) -> fetchedRoleAndVersion); + } + } catch (ExecutionException e) { + throw new AssertionError("failed to load constant non-null value", e); + } + } else { + logger.trace("role [{}] was not found", roleId); + } } + roleActionListener.onResponse(roleAndVersion); } - }, latch)); - try { - latch.await(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - logger.error("timed out retrieving role [{}]", roleId); - } - - GetResponse response = getRef.get(); - if (response == null) { - return null; - } - - RoleDescriptor descriptor = transformRole(response); - if (descriptor == null) { - return null; - } - logger.debug("loaded role [{}] from index with version [{}]", key, response.getVersion()); - try (final ReleasableLock ignored = readLock.acquire()) { - return new RoleAndVersion(descriptor, response.getVersion()); - } - }); - } catch (ExecutionException e) { - if (e.getCause() instanceof NullPointerException) { - logger.trace((Supplier) () -> new ParameterizedMessage("role [{}] was not found", roleId), e); + @Override + public void onFailure(Exception e) { + logger.error((Supplier) () -> new ParameterizedMessage("failed to load role [{}]", roleId), e); + roleActionListener.onFailure(e); + } + }); } else { - logger.error((Supplier) () -> new ParameterizedMessage("failed to load role [{}]", roleId), e); + roleActionListener.onResponse(cachedRoleAndVersion); } } - - return roleAndVersion; } private void executeGetRoleRequest(String role, ActionListener listener) { try { GetRequest request = client.prepareGet(SecurityTemplateService.SECURITY_INDEX_NAME, ROLE_DOC_TYPE, role).request(); - client.get(request, listener); + // TODO we use a threaded listener here to make sure we don't execute on a transport thread. This can be removed once + // all blocking operations are removed from this and NativeUserStore + client.get(request, new ThreadedActionListener<>(logger, client.threadPool(), ThreadPool.Names.LISTENER, listener, true)); } catch (IndexNotFoundException e) { logger.trace( (Supplier) () -> new ParameterizedMessage( @@ -551,6 +562,7 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C public void invalidateAll() { logger.debug("invalidating all roles in cache"); + numInvalidation.incrementAndGet(); try (final ReleasableLock ignored = readLock.acquire()) { roleCache.invalidateAll(); } @@ -558,6 +570,7 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C public void invalidate(String role) { logger.debug("invalidating role [{}] in cache", role); + numInvalidation.incrementAndGet(); try (final ReleasableLock ignored = readLock.acquire()) { roleCache.invalidate(role); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/ReservedRolesStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/ReservedRolesStore.java index 4eaf675a492..b4079f189dc 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/ReservedRolesStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/ReservedRolesStore.java @@ -27,7 +27,7 @@ import org.elasticsearch.xpack.security.user.KibanaUser; import org.elasticsearch.xpack.security.user.SystemUser; import org.elasticsearch.xpack.security.user.User; -public class ReservedRolesStore implements RolesStore { +public class ReservedRolesStore { private static final User DEFAULT_ENABLED_KIBANA_USER = new KibanaUser(true); private final SecurityContext securityContext; @@ -36,7 +36,6 @@ public class ReservedRolesStore implements RolesStore { this.securityContext = securityContext; } - @Override public Role role(String role) { switch (role) { case SuperuserRole.NAME: @@ -67,7 +66,6 @@ public class ReservedRolesStore implements RolesStore { } } - @Override public Map usageStats() { return Collections.emptyMap(); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/RolesStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/RolesStore.java deleted file mode 100644 index eeb3ab8862b..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/RolesStore.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.security.authz.store; - -import org.elasticsearch.xpack.security.authz.permission.Role; - -import java.util.Map; - -/** - * An interface for looking up a role given a string role name - */ -public interface RolesStore { - - Role role(String role); - - Map usageStats(); -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java index bd22dc413dd..5c390dd8db3 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.security.transport; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -14,6 +15,7 @@ import org.elasticsearch.xpack.security.authz.AuthorizationService; import org.elasticsearch.xpack.security.authz.AuthorizationUtils; import org.elasticsearch.xpack.security.authz.accesscontrol.RequestContext; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.security.support.Exceptions; import org.elasticsearch.xpack.ssl.SSLService; import org.elasticsearch.xpack.security.transport.netty3.SecurityNetty3Transport; import org.elasticsearch.tasks.Task; @@ -29,9 +31,13 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.xpack.security.user.SystemUser; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.function.Consumer; import static org.elasticsearch.xpack.XPackSettings.TRANSPORT_SSL_ENABLED; import static org.elasticsearch.xpack.security.Security.setting; @@ -100,8 +106,8 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor @Override public TransportRequestHandler interceptHandler(String action, String executor, TransportRequestHandler actualHandler) { - return new ProfileSecuredRequestHandler<>(action, actualHandler, profileFilters, - licenseState, threadPool.getThreadContext()); + return new ProfileSecuredRequestHandler<>(action, executor, actualHandler, profileFilters, + licenseState, threadPool); } @@ -150,20 +156,45 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor private final Map profileFilters; private final XPackLicenseState licenseState; private final ThreadContext threadContext; + private final String executorName; + private final ThreadPool threadPool; - private ProfileSecuredRequestHandler(String action, TransportRequestHandler handler, + private ProfileSecuredRequestHandler(String action, String executorName, TransportRequestHandler handler, Map profileFilters, XPackLicenseState licenseState, - ThreadContext threadContext) { + ThreadPool threadPool) { this.action = action; + this.executorName = executorName; this.handler = handler; this.profileFilters = profileFilters; this.licenseState = licenseState; - this.threadContext = threadContext; + this.threadContext = threadPool.getThreadContext(); + this.threadPool = threadPool; } @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + final Consumer onFailure = (e) -> { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + }; + final Runnable receiveMessage = () -> { + // FIXME we should remove the RequestContext completely since we have ThreadContext but cannot yet due to + // the query cache + RequestContext context = new RequestContext(request, threadContext); + RequestContext.setCurrent(context); + try { + handler.messageReceived(request, channel, task); + } catch (Exception e) { + onFailure.accept(e); + } finally { + RequestContext.removeCurrent(); + } + }; try (ThreadContext.StoredContext ctx = threadContext.newStoredContext()) { + if (licenseState.isAuthAllowed()) { String profile = channel.getProfileName(); ServerTransportFilter filter = profileFilters.get(profile); @@ -178,16 +209,35 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor } } assert filter != null; - filter.inbound(action, request, channel); + final Thread executingThread = Thread.currentThread(); + Consumer consumer = (x) -> { + final Executor executor; + if (executingThread == Thread.currentThread()) { + // only fork off if we get called on another thread this means we moved to + // an async execution and in this case we need to go back to the thread pool + // that was actually executing it. it's also possible that the + // thread-pool we are supposed to execute on is `SAME` in that case + // the handler is OK with executing on a network thread and we can just continue even if + // we are on another thread due to async operations + executor = threadPool.executor(ThreadPool.Names.SAME); + } else { + executor = threadPool.executor(executorName); + } + + try { + executor.execute(receiveMessage); + } catch (Exception e) { + onFailure.accept(e); + } + + }; + ActionListener filterListener = ActionListener.wrap(consumer, onFailure); + filter.inbound(action, request, channel, filterListener); + } else { + receiveMessage.run(); } - // FIXME we should remove the RequestContext completely since we have ThreadContext but cannot yet due to the query cache - RequestContext context = new RequestContext(request, threadContext); - RequestContext.setCurrent(context); - handler.messageReceived(request, channel, task); } catch (Exception e) { channel.sendResponse(e); - } finally { - RequestContext.removeCurrent(); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java index cb09dfc43e9..500ce9e5b3c 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.security.transport; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.transport.DelegatingTransportChannel; @@ -19,6 +20,8 @@ import org.elasticsearch.xpack.security.authc.Authentication; import org.elasticsearch.xpack.security.authc.AuthenticationService; import org.elasticsearch.xpack.security.authc.pki.PkiRealm; import org.elasticsearch.xpack.security.authz.AuthorizationService; +import org.elasticsearch.xpack.security.authz.AuthorizationUtils; +import org.elasticsearch.xpack.security.authz.permission.Role; import org.jboss.netty.channel.Channel; import org.jboss.netty.handler.ssl.SslHandler; @@ -27,6 +30,7 @@ import javax.net.ssl.SSLPeerUnverifiedException; import java.io.IOException; import java.security.cert.Certificate; import java.security.cert.X509Certificate; +import java.util.Collection; import static org.elasticsearch.xpack.security.support.Exceptions.authenticationError; @@ -43,7 +47,8 @@ public interface ServerTransportFilter { * thrown by this method will stop the request from being handled and the error will * be sent back to the sender. */ - void inbound(String action, TransportRequest request, TransportChannel transportChannel) throws IOException; + void inbound(String action, TransportRequest request, TransportChannel transportChannel, ActionListener listener) + throws IOException; /** * The server trasnport filter that should be used in nodes as it ensures that an incoming @@ -67,7 +72,8 @@ public interface ServerTransportFilter { } @Override - public void inbound(String action, TransportRequest request, TransportChannel transportChannel) throws IOException { + public void inbound(String action, TransportRequest request, TransportChannel transportChannel, ActionListener listener) + throws IOException { /* here we don't have a fallback user, as all incoming request are expected to have a user attached (either in headers or in context) @@ -97,9 +103,13 @@ public interface ServerTransportFilter { } } } - - Authentication authentication = authcService.authenticate(securityAction, request, null); - authzService.authorize(authentication, securityAction, request); + final Authentication authentication = authcService.authenticate(securityAction, request, null); + final AuthorizationUtils.AsyncAuthorizer asyncAuthorizer = new AuthorizationUtils.AsyncAuthorizer(authentication, listener, + (userRoles, runAsRoles) -> { + authzService.authorize(authentication, securityAction, request, userRoles, runAsRoles); + listener.onResponse(null); + }); + asyncAuthorizer.authorize(authzService); } private void extactClientCertificates(SSLEngine sslEngine, Object channel) { @@ -138,13 +148,14 @@ public interface ServerTransportFilter { } @Override - public void inbound(String action, TransportRequest request, TransportChannel transportChannel) throws IOException { + public void inbound(String action, TransportRequest request, TransportChannel transportChannel, ActionListener listener) + throws IOException { // TODO is ']' sufficient to mark as shard action? boolean isInternalOrShardAction = action.startsWith("internal:") || action.endsWith("]"); if (isInternalOrShardAction) { throw authenticationError("executing internal/shard actions is considered malicious and forbidden"); } - super.inbound(action, request, transportChannel); + super.inbound(action, request, transportChannel, listener); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/integration/ClearRolesCacheTests.java b/elasticsearch/src/test/java/org/elasticsearch/integration/ClearRolesCacheTests.java index 7dc924702f4..a37109afd39 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/integration/ClearRolesCacheTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/integration/ClearRolesCacheTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.integration; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; @@ -14,6 +15,7 @@ import org.elasticsearch.xpack.security.action.role.DeleteRoleResponse; import org.elasticsearch.xpack.security.action.role.GetRolesResponse; import org.elasticsearch.xpack.security.action.role.PutRoleResponse; import org.elasticsearch.xpack.security.authz.permission.FieldPermissions; +import org.elasticsearch.xpack.security.authz.permission.Role; import org.elasticsearch.xpack.security.authz.store.NativeRolesStore; import org.elasticsearch.xpack.security.client.SecurityClient; import org.junit.Before; @@ -60,7 +62,9 @@ public class ClearRolesCacheTests extends NativeRealmIntegTestCase { // warm up the caches on every node for (NativeRolesStore rolesStore : internalCluster().getInstances(NativeRolesStore.class)) { for (String role : roles) { - assertThat(rolesStore.role(role), notNullValue()); + PlainActionFuture future = new PlainActionFuture<>(); + rolesStore.role(role, future); + assertThat(future.actionGet(), notNullValue()); } } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/common/GroupedActionListenerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/common/GroupedActionListenerTests.java new file mode 100644 index 00000000000..ae38f5b36fc --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/common/GroupedActionListenerTests.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.common; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class GroupedActionListenerTests extends ESTestCase { + + public void testNotifications() throws InterruptedException { + AtomicReference> resRef = new AtomicReference<>(); + ActionListener> result = new ActionListener>() { + @Override + public void onResponse(Collection integers) { + resRef.set(integers); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }; + final int groupSize = randomIntBetween(10, 1000); + AtomicInteger count = new AtomicInteger(); + Collection defaults = randomBoolean() ? Collections.singletonList(-1) : Collections.emptyList(); + GroupedActionListener listener = new GroupedActionListener<>(result, groupSize, defaults); + int numThreads = randomIntBetween(2, 5); + Thread[] threads = new Thread[numThreads]; + CyclicBarrier barrier = new CyclicBarrier(numThreads); + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread() { + @Override + public void run() { + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new AssertionError(e); + } + int c = 0; + while((c = count.incrementAndGet()) <= groupSize) { + listener.onResponse(c-1); + } + } + }; + threads[i].start(); + } + for (Thread t : threads) { + t.join(); + } + assertNotNull(resRef.get()); + ArrayList list = new ArrayList<>(resRef.get()); + Collections.sort(list); + int expectedSize = groupSize + defaults.size(); + assertEquals(expectedSize, resRef.get().size()); + int expectedValue = defaults.isEmpty() ? 0 : -1; + for (int i = 0; i < expectedSize; i++) { + assertEquals(Integer.valueOf(expectedValue++), list.get(i)); + } + } + + public void testFailed() { + AtomicReference> resRef = new AtomicReference<>(); + AtomicReference excRef = new AtomicReference<>(); + + ActionListener> result = new ActionListener>() { + @Override + public void onResponse(Collection integers) { + resRef.set(integers); + } + + @Override + public void onFailure(Exception e) { + excRef.set(e); + } + }; + Collection defaults = randomBoolean() ? Collections.singletonList(-1) : Collections.emptyList(); + int size = randomIntBetween(3, 4); + GroupedActionListener listener = new GroupedActionListener<>(result, size, defaults); + listener.onResponse(0); + IOException ioException = new IOException(); + RuntimeException rtException = new RuntimeException(); + listener.onFailure(rtException); + listener.onFailure(ioException); + if (size == 4) { + listener.onResponse(2); + } + assertNotNull(excRef.get()); + assertEquals(rtException, excRef.get()); + assertEquals(1, excRef.get().getSuppressed().length); + assertEquals(ioException, excRef.get().getSuppressed()[0]); + assertNull(resRef.get()); + listener.onResponse(1); + assertNull(resRef.get()); + } +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientTests.java index dcae61a7742..59470300949 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.security; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -17,7 +16,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.rest.yaml.section.Assertion; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.security.crypto.CryptoService; @@ -25,7 +23,6 @@ import org.elasticsearch.xpack.security.crypto.CryptoService; import java.io.IOException; import java.nio.file.Path; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class InternalClientTests extends ESTestCase { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilterTests.java index 1d314764236..8dff4dd5603 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilterTests.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.security.action.filter; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import org.elasticsearch.ElasticsearchSecurityException; @@ -17,8 +19,8 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.common.ContextPreservingActionListener; import org.elasticsearch.xpack.security.SecurityContext; -import org.elasticsearch.xpack.security.action.SecurityActionMapper; import org.elasticsearch.xpack.security.audit.AuditTrailService; import org.elasticsearch.xpack.security.authc.Authentication; import org.elasticsearch.xpack.security.authc.Authentication.RealmRef; @@ -31,8 +33,10 @@ import org.elasticsearch.license.XPackLicenseState; import org.junit.Before; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -72,11 +76,18 @@ public class SecurityActionFilterTests extends ESTestCase { Task task = mock(Task.class); User user = new User("username", "r1", "r2"); Authentication authentication = new Authentication(user, new RealmRef("test", "test", "foo"), null); + when(authcService.authenticate("_action", request, SystemUser.INSTANCE)).thenReturn(authentication); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(Collections.emptyList()); + return Void.TYPE; + }).when(authzService).roles(any(User.class), any(ActionListener.class)); doReturn(request).when(spy(filter)).unsign(user, "_action", request); filter.apply(task, "_action", request, listener, chain); - verify(authzService).authorize(authentication, "_action", request); - verify(chain).proceed(eq(task), eq("_action"), eq(request), isA(SecurityActionFilter.SigningListener.class)); + verify(authzService).authorize(authentication, "_action", request, Collections.emptyList(), Collections.emptyList()); + verify(chain).proceed(eq(task), eq("_action"), eq(request), isA(ContextPreservingActionListener.class)); } public void testActionProcessException() throws Exception { @@ -88,7 +99,14 @@ public class SecurityActionFilterTests extends ESTestCase { User user = new User("username", "r1", "r2"); Authentication authentication = new Authentication(user, new RealmRef("test", "test", "foo"), null); when(authcService.authenticate("_action", request, SystemUser.INSTANCE)).thenReturn(authentication); - doThrow(exception).when(authzService).authorize(authentication, "_action", request); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(Collections.emptyList()); + return Void.TYPE; + }).when(authzService).roles(any(User.class), any(ActionListener.class)); + doThrow(exception).when(authzService).authorize(eq(authentication), eq("_action"), eq(request), any(Collection.class), + any(Collection.class)); filter.apply(task, "_action", request, listener, chain); verify(listener).onFailure(exception); verifyNoMoreInteractions(chain); @@ -104,10 +122,17 @@ public class SecurityActionFilterTests extends ESTestCase { when(authcService.authenticate("_action", request, SystemUser.INSTANCE)).thenReturn(authentication); when(cryptoService.isSigned("signed_scroll_id")).thenReturn(true); when(cryptoService.unsignAndVerify("signed_scroll_id")).thenReturn("scroll_id"); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(Collections.emptyList()); + return Void.TYPE; + }).when(authzService).roles(any(User.class), any(ActionListener.class)); filter.apply(task, "_action", request, listener, chain); assertThat(request.scrollId(), equalTo("scroll_id")); - verify(authzService).authorize(authentication, "_action", request); - verify(chain).proceed(eq(task), eq("_action"), eq(request), isA(SecurityActionFilter.SigningListener.class)); + + verify(authzService).authorize(authentication, "_action", request, Collections.emptyList(), Collections.emptyList()); + verify(chain).proceed(eq(task), eq("_action"), eq(request), isA(ContextPreservingActionListener.class)); } public void testActionSignatureError() throws Exception { @@ -121,6 +146,12 @@ public class SecurityActionFilterTests extends ESTestCase { when(authcService.authenticate("_action", request, SystemUser.INSTANCE)).thenReturn(authentication); when(cryptoService.isSigned("scroll_id")).thenReturn(true); doThrow(sigException).when(cryptoService).unsignAndVerify("scroll_id"); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(Collections.emptyList()); + return Void.TYPE; + }).when(authzService).roles(any(User.class), any(ActionListener.class)); filter.apply(task, "_action", request, listener, chain); verify(listener).onFailure(isA(ElasticsearchSecurityException.class)); verify(auditTrail).tamperedRequest(user, "_action", request); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index 2a2b7cda677..c7bcf3c8881 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.security.authz; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; @@ -54,6 +55,7 @@ import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchTransportService; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.termvectors.MultiTermVectorsAction; import org.elasticsearch.action.termvectors.MultiTermVectorsRequest; import org.elasticsearch.action.termvectors.TermVectorsAction; @@ -95,6 +97,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -109,6 +112,9 @@ import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -118,11 +124,12 @@ import static org.mockito.Mockito.when; public class AuthorizationServiceTests extends ESTestCase { private AuditTrailService auditTrail; - private CompositeRolesStore rolesStore; private ClusterService clusterService; private AuthorizationService authorizationService; private ThreadContext threadContext; private ThreadPool threadPool; + private Map roleMap = new HashMap<>(); + private CompositeRolesStore rolesStore; @Before public void setup() { @@ -132,19 +139,34 @@ public class AuthorizationServiceTests extends ESTestCase { threadContext = new ThreadContext(Settings.EMPTY); threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(threadContext); - + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(roleMap.get((String)i.getArguments()[0])); + return Void.TYPE; + }).when(rolesStore).roles(any(String.class), any(ActionListener.class)); authorizationService = new AuthorizationService(Settings.EMPTY, rolesStore, clusterService, auditTrail, new DefaultAuthenticationFailureHandler(), threadPool, new AnonymousUser(Settings.EMPTY)); } + private void authorize(Authentication authentication, String action, TransportRequest request) { + PlainActionFuture future = new PlainActionFuture(); + AuthorizationUtils.AsyncAuthorizer authorizer = new AuthorizationUtils.AsyncAuthorizer(authentication, future, + (userRoles, runAsRoles) -> {authorizationService.authorize(authentication, action, request, userRoles, runAsRoles); + future.onResponse(null); + }); + authorizer.authorize(authorizationService); + future.actionGet(); + } + public void testActionsSystemUserIsAuthorized() { TransportRequest request = mock(TransportRequest.class); // A failure would throw an exception - authorizationService.authorize(createAuthentication(SystemUser.INSTANCE), "indices:monitor/whatever", request); + authorize(createAuthentication(SystemUser.INSTANCE), "indices:monitor/whatever", request); verify(auditTrail).accessGranted(SystemUser.INSTANCE, "indices:monitor/whatever", request); - authorizationService.authorize(createAuthentication(SystemUser.INSTANCE), "internal:whatever", request); + authorize(createAuthentication(SystemUser.INSTANCE), "internal:whatever", request); verify(auditTrail).accessGranted(SystemUser.INSTANCE, "internal:whatever", request); verifyNoMoreInteractions(auditTrail); } @@ -152,7 +174,7 @@ public class AuthorizationServiceTests extends ESTestCase { public void testIndicesActionsAreNotAuthorized() { TransportRequest request = mock(TransportRequest.class); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(SystemUser.INSTANCE), "indices:", request), + () -> authorize(createAuthentication(SystemUser.INSTANCE), "indices:", request), "indices:", SystemUser.INSTANCE.principal()); verify(auditTrail).accessDenied(SystemUser.INSTANCE, "indices:", request); verifyNoMoreInteractions(auditTrail); @@ -161,7 +183,7 @@ public class AuthorizationServiceTests extends ESTestCase { public void testClusterAdminActionsAreNotAuthorized() { TransportRequest request = mock(TransportRequest.class); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(SystemUser.INSTANCE), "cluster:admin/whatever", request), + () -> authorize(createAuthentication(SystemUser.INSTANCE), "cluster:admin/whatever", request), "cluster:admin/whatever", SystemUser.INSTANCE.principal()); verify(auditTrail).accessDenied(SystemUser.INSTANCE, "cluster:admin/whatever", request); verifyNoMoreInteractions(auditTrail); @@ -170,7 +192,7 @@ public class AuthorizationServiceTests extends ESTestCase { public void testClusterAdminSnapshotStatusActionIsNotAuthorized() { TransportRequest request = mock(TransportRequest.class); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(SystemUser.INSTANCE), "cluster:admin/snapshot/status", request), + () -> authorize(createAuthentication(SystemUser.INSTANCE), "cluster:admin/snapshot/status", request), "cluster:admin/snapshot/status", SystemUser.INSTANCE.principal()); verify(auditTrail).accessDenied(SystemUser.INSTANCE, "cluster:admin/snapshot/status", request); verifyNoMoreInteractions(auditTrail); @@ -181,7 +203,7 @@ public class AuthorizationServiceTests extends ESTestCase { User user = new User("test user"); mockEmptyMetaData(); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user"); verify(auditTrail).accessDenied(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -192,7 +214,7 @@ public class AuthorizationServiceTests extends ESTestCase { User user = new User("test user", "non-existent-role"); mockEmptyMetaData(); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user"); verify(auditTrail).accessDenied(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -201,10 +223,10 @@ public class AuthorizationServiceTests extends ESTestCase { public void testThatNonIndicesAndNonClusterActionIsDenied() { TransportRequest request = mock(TransportRequest.class); User user = new User("test user", "a_all"); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_role").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_role").add(IndexPrivilege.ALL, "a").build()); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), "whatever", request), + () -> authorize(createAuthentication(user), "whatever", request), "whatever", "test user"); verify(auditTrail).accessDenied(user, "whatever", request); verifyNoMoreInteractions(auditTrail); @@ -213,11 +235,11 @@ public class AuthorizationServiceTests extends ESTestCase { public void testThatRoleWithNoIndicesIsDenied() { TransportRequest request = new IndicesExistsRequest("a"); User user = new User("test user", "no_indices"); - when(rolesStore.role("no_indices")).thenReturn(Role.builder("no_indices").cluster(ClusterPrivilege.action("")).build()); + roleMap.put("no_indices", Role.builder("no_indices").cluster(ClusterPrivilege.action("")).build()); mockEmptyMetaData(); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user"); verify(auditTrail).accessDenied(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -225,7 +247,7 @@ public class AuthorizationServiceTests extends ESTestCase { public void testSearchAgainstEmptyCluster() { User user = new User("test user", "a_all"); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_role").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_role").add(IndexPrivilege.ALL, "a").build()); mockEmptyMetaData(); { @@ -234,7 +256,7 @@ public class AuthorizationServiceTests extends ESTestCase { .indicesOptions(IndicesOptions.fromOptions(false, true, true, false)); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), SearchAction.NAME, searchRequest), + () -> authorize(createAuthentication(user), SearchAction.NAME, searchRequest), SearchAction.NAME, "test user"); verify(auditTrail).accessDenied(user, SearchAction.NAME, searchRequest); verifyNoMoreInteractions(auditTrail); @@ -244,7 +266,7 @@ public class AuthorizationServiceTests extends ESTestCase { //ignore_unavailable and allow_no_indices both set to true, user is not authorized for this index nor does it exist SearchRequest searchRequest = new SearchRequest("does_not_exist") .indicesOptions(IndicesOptions.fromOptions(true, true, true, false)); - authorizationService.authorize(createAuthentication(user), SearchAction.NAME, searchRequest); + authorize(createAuthentication(user), SearchAction.NAME, searchRequest); verify(auditTrail).accessGranted(user, SearchAction.NAME, searchRequest); IndicesAccessControl indicesAccessControl = threadContext.getTransient(AuthorizationService.INDICES_PERMISSIONS_KEY); IndicesAccessControl.IndexAccessControl indexAccessControl = @@ -256,33 +278,32 @@ public class AuthorizationServiceTests extends ESTestCase { public void testScrollRelatedRequestsAllowed() { User user = new User("test user", "a_all"); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_role").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_role").add(IndexPrivilege.ALL, "a").build()); mockEmptyMetaData(); ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - authorizationService.authorize(createAuthentication(user), ClearScrollAction.NAME, clearScrollRequest); + authorize(createAuthentication(user), ClearScrollAction.NAME, clearScrollRequest); verify(auditTrail).accessGranted(user, ClearScrollAction.NAME, clearScrollRequest); SearchScrollRequest searchScrollRequest = new SearchScrollRequest(); - authorizationService.authorize(createAuthentication(user), SearchScrollAction.NAME, searchScrollRequest); + authorize(createAuthentication(user), SearchScrollAction.NAME, searchScrollRequest); verify(auditTrail).accessGranted(user, SearchScrollAction.NAME, searchScrollRequest); // We have to use a mock request for other Scroll actions as the actual requests are package private to SearchTransportService TransportRequest request = mock(TransportRequest.class); - authorizationService - .authorize(createAuthentication(user), SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME, request); + authorize(createAuthentication(user), SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME, request); verify(auditTrail).accessGranted(user, SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME, request); - authorizationService.authorize(createAuthentication(user), SearchTransportService.FETCH_ID_SCROLL_ACTION_NAME, request); + authorize(createAuthentication(user), SearchTransportService.FETCH_ID_SCROLL_ACTION_NAME, request); verify(auditTrail).accessGranted(user, SearchTransportService.FETCH_ID_SCROLL_ACTION_NAME, request); - authorizationService.authorize(createAuthentication(user), SearchTransportService.QUERY_FETCH_SCROLL_ACTION_NAME, request); + authorize(createAuthentication(user), SearchTransportService.QUERY_FETCH_SCROLL_ACTION_NAME, request); verify(auditTrail).accessGranted(user, SearchTransportService.QUERY_FETCH_SCROLL_ACTION_NAME, request); - authorizationService.authorize(createAuthentication(user), SearchTransportService.QUERY_SCROLL_ACTION_NAME, request); + authorize(createAuthentication(user), SearchTransportService.QUERY_SCROLL_ACTION_NAME, request); verify(auditTrail).accessGranted(user, SearchTransportService.QUERY_SCROLL_ACTION_NAME, request); - authorizationService.authorize(createAuthentication(user), SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME, request); + authorize(createAuthentication(user), SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME, request); verify(auditTrail).accessGranted(user, SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME, request); verifyNoMoreInteractions(auditTrail); } @@ -291,10 +312,10 @@ public class AuthorizationServiceTests extends ESTestCase { TransportRequest request = new GetIndexRequest().indices("b"); ClusterState state = mockEmptyMetaData(); User user = new User("test user", "a_all"); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user"); verify(auditTrail).accessDenied(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -307,10 +328,10 @@ public class AuthorizationServiceTests extends ESTestCase { request.alias(new Alias("a2")); ClusterState state = mockEmptyMetaData(); User user = new User("test user", "a_all"); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), CreateIndexAction.NAME, request), + () -> authorize(createAuthentication(user), CreateIndexAction.NAME, request), IndicesAliasesAction.NAME, "test user"); verify(auditTrail).accessDenied(user, IndicesAliasesAction.NAME, request); verifyNoMoreInteractions(auditTrail); @@ -323,9 +344,9 @@ public class AuthorizationServiceTests extends ESTestCase { request.alias(new Alias("a2")); ClusterState state = mockEmptyMetaData(); User user = new User("test user", "a_all"); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_all").add(IndexPrivilege.ALL, "a", "a2").build()); + roleMap.put("a_all", Role.builder("a_all").add(IndexPrivilege.ALL, "a", "a2").build()); - authorizationService.authorize(createAuthentication(user), CreateIndexAction.NAME, request); + authorize(createAuthentication(user), CreateIndexAction.NAME, request); verify(auditTrail).accessGranted(user, CreateIndexAction.NAME, request); verifyNoMoreInteractions(auditTrail); @@ -341,10 +362,10 @@ public class AuthorizationServiceTests extends ESTestCase { authorizationService = new AuthorizationService(settings, rolesStore, clusterService, auditTrail, new DefaultAuthenticationFailureHandler(), threadPool, anonymousUser); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(anonymousUser), "indices:a", request), + () -> authorize(createAuthentication(anonymousUser), "indices:a", request), "indices:a", anonymousUser.principal()); verify(auditTrail).accessDenied(anonymousUser, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -363,10 +384,10 @@ public class AuthorizationServiceTests extends ESTestCase { authorizationService = new AuthorizationService(settings, rolesStore, clusterService, auditTrail, new DefaultAuthenticationFailureHandler(), threadPool, new AnonymousUser(settings)); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); ElasticsearchSecurityException securityException = expectThrows(ElasticsearchSecurityException.class, - () -> authorizationService.authorize(createAuthentication(anonymousUser), "indices:a", request)); + () -> authorize(createAuthentication(anonymousUser), "indices:a", request)); assertAuthenticationException(securityException, containsString("action [indices:a] requires authentication")); verify(auditTrail).accessDenied(anonymousUser, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -379,7 +400,7 @@ public class AuthorizationServiceTests extends ESTestCase { User user = new User("test user", null, new User("run as me", new String[] { "admin" })); assertThat(user.runAs(), is(notNullValue())); assertThrowsAuthorizationExceptionRunAs( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user", "run as me"); // run as [run as me] verify(auditTrail).runAsDenied(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -389,14 +410,14 @@ public class AuthorizationServiceTests extends ESTestCase { TransportRequest request = mock(TransportRequest.class); User user = new User("test user", new String[] { "can run as" }, new User("run as me", "doesn't exist")); assertThat(user.runAs(), is(notNullValue())); - when(rolesStore.role("can run as")).thenReturn(Role + roleMap.put("can run as", Role .builder("can run as") .runAs(new GeneralPrivilege("", "not the right user")) .add(IndexPrivilege.ALL, "a") .build()); assertThrowsAuthorizationExceptionRunAs( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user", "run as me"); verify(auditTrail).runAsDenied(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -406,7 +427,7 @@ public class AuthorizationServiceTests extends ESTestCase { TransportRequest request = new GetIndexRequest().indices("a"); User user = new User("test user", new String[] { "can run as" }, new User("run as me", "b")); assertThat(user.runAs(), is(notNullValue())); - when(rolesStore.role("can run as")).thenReturn(Role + roleMap.put("can run as", Role .builder("can run as") .runAs(new GeneralPrivilege("", "run as me")) .add(IndexPrivilege.ALL, "a") @@ -420,7 +441,7 @@ public class AuthorizationServiceTests extends ESTestCase { .settings(Settings.builder().put("index.version.created", Version.CURRENT).build()) .numberOfShards(1).numberOfReplicas(0).build(), true) .build()); - when(rolesStore.role("b")).thenReturn(Role + roleMap.put("b", Role .builder("b") .add(IndexPrivilege.ALL, "b") .build()); @@ -429,7 +450,7 @@ public class AuthorizationServiceTests extends ESTestCase { } assertThrowsAuthorizationExceptionRunAs( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user", "run as me"); verify(auditTrail).runAsGranted(user, "indices:a", request); verify(auditTrail).accessDenied(user, "indices:a", request); @@ -440,7 +461,7 @@ public class AuthorizationServiceTests extends ESTestCase { TransportRequest request = new GetIndexRequest().indices("b"); User user = new User("test user", new String[] { "can run as" }, new User("run as me", "b")); assertThat(user.runAs(), is(notNullValue())); - when(rolesStore.role("can run as")).thenReturn(Role + roleMap.put("can run as", Role .builder("can run as") .runAs(new GeneralPrivilege("", "run as me")) .add(IndexPrivilege.ALL, "a") @@ -452,12 +473,12 @@ public class AuthorizationServiceTests extends ESTestCase { .settings(Settings.builder().put("index.version.created", Version.CURRENT).build()) .numberOfShards(1).numberOfReplicas(0).build(), true) .build()); - when(rolesStore.role("b")).thenReturn(Role + roleMap.put("b", Role .builder("b") .add(IndexPrivilege.ALL, "b") .build()); - authorizationService.authorize(createAuthentication(user), "indices:a", request); + authorize(createAuthentication(user), "indices:a", request); verify(auditTrail).runAsGranted(user, "indices:a", request); verify(auditTrail).accessGranted(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -465,7 +486,7 @@ public class AuthorizationServiceTests extends ESTestCase { public void testNonXPackUserCannotExecuteOperationAgainstSecurityIndex() { User user = new User("all_access_user", "all_access"); - when(rolesStore.role("all_access")).thenReturn(Role.builder("all_access") + roleMap.put("all_access", Role.builder("all_access") .add(IndexPrivilege.ALL, "*") .cluster(ClusterPrivilege.ALL) .build()); @@ -496,7 +517,7 @@ public class AuthorizationServiceTests extends ESTestCase { String action = requestTuple.v1(); TransportRequest request = requestTuple.v2(); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), action, request), + () -> authorize(createAuthentication(user), action, request), action, "all_access_user"); verify(auditTrail).accessDenied(user, action, request); verifyNoMoreInteractions(auditTrail); @@ -504,23 +525,23 @@ public class AuthorizationServiceTests extends ESTestCase { // we should allow waiting for the health of the index or any index if the user has this permission ClusterHealthRequest request = new ClusterHealthRequest(SecurityTemplateService.SECURITY_INDEX_NAME); - authorizationService.authorize(createAuthentication(user), ClusterHealthAction.NAME, request); + authorize(createAuthentication(user), ClusterHealthAction.NAME, request); verify(auditTrail).accessGranted(user, ClusterHealthAction.NAME, request); // multiple indices request = new ClusterHealthRequest(SecurityTemplateService.SECURITY_INDEX_NAME, "foo", "bar"); - authorizationService.authorize(createAuthentication(user), ClusterHealthAction.NAME, request); + authorize(createAuthentication(user), ClusterHealthAction.NAME, request); verify(auditTrail).accessGranted(user, ClusterHealthAction.NAME, request); SearchRequest searchRequest = new SearchRequest("_all"); - authorizationService.authorize(createAuthentication(user), SearchAction.NAME, searchRequest); + authorize(createAuthentication(user), SearchAction.NAME, searchRequest); assertEquals(2, searchRequest.indices().length); assertEquals(IndicesAndAliasesResolver.NO_INDICES_LIST, Arrays.asList(searchRequest.indices())); } public void testGrantedNonXPackUserCanExecuteMonitoringOperationsAgainstSecurityIndex() { User user = new User("all_access_user", "all_access"); - when(rolesStore.role("all_access")).thenReturn(Role.builder("all_access") + roleMap.put("all_access", Role.builder("all_access") .add(IndexPrivilege.ALL, "*") .cluster(ClusterPrivilege.ALL) .build()); @@ -546,14 +567,14 @@ public class AuthorizationServiceTests extends ESTestCase { for (Tuple requestTuple : requests) { String action = requestTuple.v1(); TransportRequest request = requestTuple.v2(); - authorizationService.authorize(createAuthentication(user), action, request); + authorize(createAuthentication(user), action, request); verify(auditTrail).accessGranted(user, action, request); } } public void testXPackUserAndSuperusersCanExecuteOperationAgainstSecurityIndex() { final User superuser = new User("custom_admin", SuperuserRole.NAME); - when(rolesStore.role(SuperuserRole.NAME)).thenReturn(Role.builder(SuperuserRole.DESCRIPTOR).build()); + roleMap.put(SuperuserRole.NAME, Role.builder(SuperuserRole.DESCRIPTOR).build()); ClusterState state = mock(ClusterState.class); when(clusterService.state()).thenReturn(state); when(state.metaData()).thenReturn(MetaData.builder() @@ -582,7 +603,7 @@ public class AuthorizationServiceTests extends ESTestCase { for (Tuple requestTuple : requests) { String action = requestTuple.v1(); TransportRequest request = requestTuple.v2(); - authorizationService.authorize(createAuthentication(user), action, request); + authorize(createAuthentication(user), action, request); verify(auditTrail).accessGranted(user, action, request); } } @@ -590,7 +611,7 @@ public class AuthorizationServiceTests extends ESTestCase { public void testXPackUserAndSuperusersCanExecuteOperationAgainstSecurityIndexWithWildcard() { final User superuser = new User("custom_admin", SuperuserRole.NAME); - when(rolesStore.role(SuperuserRole.NAME)).thenReturn(Role.builder(SuperuserRole.DESCRIPTOR).build()); + roleMap.put(SuperuserRole.NAME, Role.builder(SuperuserRole.DESCRIPTOR).build()); ClusterState state = mock(ClusterState.class); when(clusterService.state()).thenReturn(state); when(state.metaData()).thenReturn(MetaData.builder() @@ -601,12 +622,12 @@ public class AuthorizationServiceTests extends ESTestCase { String action = SearchAction.NAME; SearchRequest request = new SearchRequest("_all"); - authorizationService.authorize(createAuthentication(XPackUser.INSTANCE), action, request); + authorize(createAuthentication(XPackUser.INSTANCE), action, request); verify(auditTrail).accessGranted(XPackUser.INSTANCE, action, request); assertThat(request.indices(), arrayContaining(".security")); request = new SearchRequest("_all"); - authorizationService.authorize(createAuthentication(superuser), action, request); + authorize(createAuthentication(superuser), action, request); verify(auditTrail).accessGranted(superuser, action, request); assertThat(request.indices(), arrayContaining(".security")); } @@ -617,25 +638,26 @@ public class AuthorizationServiceTests extends ESTestCase { final AnonymousUser anonymousUser = new AnonymousUser(settings); authorizationService = new AuthorizationService(settings, rolesStore, clusterService, auditTrail, new DefaultAuthenticationFailureHandler(), threadPool, anonymousUser); - when(rolesStore.role("anonymous_user_role")) - .thenReturn(Role.builder("anonymous_user_role") + roleMap.put("anonymous_user_role", Role.builder("anonymous_user_role") .cluster(ClusterPrivilege.ALL) .add(IndexPrivilege.ALL, "a") .build()); mockEmptyMetaData(); // sanity check the anonymous user - authorizationService.authorize(createAuthentication(anonymousUser), ClusterHealthAction.NAME, request); - authorizationService.authorize(createAuthentication(anonymousUser), IndicesExistsAction.NAME, new IndicesExistsRequest("a")); + authorize(createAuthentication(anonymousUser), ClusterHealthAction.NAME, request); + authorize(createAuthentication(anonymousUser), IndicesExistsAction.NAME, new IndicesExistsRequest("a")); // test the no role user final User userWithNoRoles = new User("no role user"); - authorizationService.authorize(createAuthentication(userWithNoRoles), ClusterHealthAction.NAME, request); - authorizationService.authorize(createAuthentication(userWithNoRoles), IndicesExistsAction.NAME, new IndicesExistsRequest("a")); + authorize(createAuthentication(userWithNoRoles), ClusterHealthAction.NAME, request); + authorize(createAuthentication(userWithNoRoles), IndicesExistsAction.NAME, new IndicesExistsRequest("a")); } public void testDefaultRoleUserWithoutRoles() { - Collection roles = authorizationService.roles(new User("no role user")); + PlainActionFuture> rolesFuture = new PlainActionFuture<>(); + authorizationService.roles(new User("no role user"), rolesFuture); + final Collection roles = rolesFuture.actionGet(); assertEquals(1, roles.size()); assertEquals(DefaultRole.NAME, roles.iterator().next().name()); } @@ -645,13 +667,14 @@ public class AuthorizationServiceTests extends ESTestCase { final AnonymousUser anonymousUser = new AnonymousUser(settings); authorizationService = new AuthorizationService(settings, rolesStore, clusterService, auditTrail, new DefaultAuthenticationFailureHandler(), threadPool, anonymousUser); - when(rolesStore.role("anonymous_user_role")) - .thenReturn(Role.builder("anonymous_user_role") + roleMap.put("anonymous_user_role", Role.builder("anonymous_user_role") .cluster(ClusterPrivilege.ALL) .add(IndexPrivilege.ALL, "a") .build()); mockEmptyMetaData(); - Collection roles = authorizationService.roles(new User("no role user")); + PlainActionFuture> rolesFuture = new PlainActionFuture<>(); + authorizationService.roles(new User("no role user"), rolesFuture); + final Collection roles = rolesFuture.actionGet(); assertEquals(2, roles.size()); for (Role role : roles) { assertThat(role.name(), either(equalTo(DefaultRole.NAME)).or(equalTo("anonymous_user_role"))); @@ -659,12 +682,13 @@ public class AuthorizationServiceTests extends ESTestCase { } public void testDefaultRoleUserWithSomeRole() { - when(rolesStore.role("role")) - .thenReturn(Role.builder("role") + roleMap.put("role", Role.builder("role") .cluster(ClusterPrivilege.ALL) .add(IndexPrivilege.ALL, "a") .build()); - Collection roles = authorizationService.roles(new User("user with role", "role")); + PlainActionFuture> rolesFuture = new PlainActionFuture<>(); + authorizationService.roles(new User("user with role", "role"), rolesFuture); + final Collection roles = rolesFuture.actionGet(); assertEquals(2, roles.size()); for (Role role : roles) { assertThat(role.name(), either(equalTo(DefaultRole.NAME)).or(equalTo("role"))); @@ -677,9 +701,9 @@ public class AuthorizationServiceTests extends ESTestCase { String action = compositeRequest.v1(); TransportRequest request = compositeRequest.v2(); User user = new User("test user", "no_indices"); - when(rolesStore.role("no_indices")).thenReturn(Role.builder("no_indices").cluster(ClusterPrivilege.action("")).build()); + roleMap.put("no_indices", Role.builder("no_indices").cluster(ClusterPrivilege.action("")).build()); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), action, request), action, "test user"); + () -> authorize(createAuthentication(user), action, request), action, "test user"); verify(auditTrail).accessDenied(user, action, request); verifyNoMoreInteractions(auditTrail); } @@ -690,8 +714,9 @@ public class AuthorizationServiceTests extends ESTestCase { String action = compositeRequest.v1(); TransportRequest request = compositeRequest.v2(); User user = new User("test user", "role"); - when(rolesStore.role("role")).thenReturn(Role.builder("role").add(IndexPrivilege.ALL, randomBoolean() ? "a" : "index").build()); - authorizationService.authorize(createAuthentication(user), action, request); + roleMap.put("role", Role.builder("role").add(IndexPrivilege.ALL, + randomBoolean() ? "a" : "index").build()); + authorize(createAuthentication(user), action, request); verify(auditTrail).accessGranted(user, action, request); verifyNoMoreInteractions(auditTrail); } @@ -700,9 +725,10 @@ public class AuthorizationServiceTests extends ESTestCase { String action = randomCompositeRequest().v1(); TransportRequest request = mock(TransportRequest.class); User user = new User("test user", "role"); - when(rolesStore.role("role")).thenReturn(Role.builder("role").add(IndexPrivilege.ALL, randomBoolean() ? "a" : "index").build()); + roleMap.put("role", Role.builder("role").add(IndexPrivilege.ALL, + randomBoolean() ? "a" : "index").build()); IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, - () -> authorizationService.authorize(createAuthentication(user), action, request)); + () -> authorize(createAuthentication(user), action, request)); assertThat(illegalStateException.getMessage(), containsString("Composite actions must implement CompositeIndicesRequest")); } @@ -737,13 +763,13 @@ public class AuthorizationServiceTests extends ESTestCase { TransportRequest request = new MockIndicesRequest(); User userAllowed = new User("userAllowed", "roleAllowed"); - when(rolesStore.role("roleAllowed")).thenReturn(Role.builder("roleAllowed").add(IndexPrivilege.ALL, "index").build()); + roleMap.put("roleAllowed", Role.builder("roleAllowed").add(IndexPrivilege.ALL, "index").build()); User userDenied = new User("userDenied", "roleDenied"); - when(rolesStore.role("roleDenied")).thenReturn(Role.builder("roleDenied").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("roleDenied", Role.builder("roleDenied").add(IndexPrivilege.ALL, "a").build()); mockEmptyMetaData(); - authorizationService.authorize(createAuthentication(userAllowed), action, request); + authorize(createAuthentication(userAllowed), action, request); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(userDenied), action, request), action, "userDenied"); + () -> authorize(createAuthentication(userDenied), action, request), action, "userDenied"); } private static Tuple randomCompositeRequest() { @@ -787,7 +813,9 @@ public class AuthorizationServiceTests extends ESTestCase { } public void testDoesNotUseRolesStoreForXPackUser() { - Collection roles = authorizationService.roles(XPackUser.INSTANCE); + PlainActionFuture> rolesFuture = new PlainActionFuture<>(); + authorizationService.roles(XPackUser.INSTANCE, rolesFuture); + final Collection roles = rolesFuture.actionGet(); assertThat(roles, contains(SuperuserRole.INSTANCE)); verifyZeroInteractions(rolesStore); } @@ -800,7 +828,7 @@ public class AuthorizationServiceTests extends ESTestCase { final boolean roleExists = randomBoolean(); final Role anonymousRole = Role.builder("a_all").add(IndexPrivilege.ALL, "a").build(); if (roleExists) { - when(rolesStore.role("a_all")).thenReturn(anonymousRole); + roleMap.put("a_all", anonymousRole); } final MetaData metaData = MetaData.builder() .put(new IndexMetaData.Builder("a") @@ -809,9 +837,11 @@ public class AuthorizationServiceTests extends ESTestCase { .build(); User user = new User("no_roles"); - final Collection roles = authorizationService.roles(user); + PlainActionFuture> rolesFuture = new PlainActionFuture<>(); + authorizationService.roles(user, rolesFuture); + final Collection roles = rolesFuture.actionGet(); GlobalPermission globalPermission = authorizationService.permission(roles); - verify(rolesStore).role("a_all"); + verify(rolesStore).roles(eq("a_all"), any(ActionListener.class)); if (roleExists) { assertThat(roles, containsInAnyOrder(anonymousRole, DefaultRole.INSTANCE)); @@ -835,7 +865,8 @@ public class AuthorizationServiceTests extends ESTestCase { } public void testGetRolesForSystemUserThrowsException() { - IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> authorizationService.roles(SystemUser.INSTANCE)); + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> authorizationService.roles(SystemUser.INSTANCE, + null)); assertEquals("the user [_system] is the system user and we should never try to get its roles", iae.getMessage()); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java index 1160d16be71..12be38a9744 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.security.authz; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; @@ -23,6 +24,7 @@ import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.termvectors.MultiTermVectorsRequest; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -51,6 +53,8 @@ import org.junit.Before; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; @@ -58,6 +62,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.not; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -71,6 +77,7 @@ public class IndicesAndAliasesResolverTests extends ESTestCase { private AuthorizationService authzService; private IndicesAndAliasesResolver defaultIndicesResolver; private IndexNameExpressionResolver indexNameExpressionResolver; + private Map roleMap; @Before public void setup() { @@ -105,10 +112,18 @@ public class IndicesAndAliasesResolverTests extends ESTestCase { rolesStore = mock(CompositeRolesStore.class); String[] authorizedIndices = new String[] { "bar", "bar-closed", "foofoobar", "foofoo", "missing", "foofoo-closed"}; String[] dashIndices = new String[]{"-index10", "-index11", "-index20", "-index21"}; - when(rolesStore.role("role")).thenReturn(Role.builder("role").add(IndexPrivilege.ALL, authorizedIndices).build()); - when(rolesStore.role("dash")).thenReturn(Role.builder("dash").add(IndexPrivilege.ALL, dashIndices).build()); - when(rolesStore.role("test")).thenReturn(Role.builder("test").cluster(ClusterPrivilege.MONITOR).build()); - when(rolesStore.role(SuperuserRole.NAME)).thenReturn(Role.builder(SuperuserRole.DESCRIPTOR).build()); + roleMap = new HashMap<>(); + roleMap.put("role", Role.builder("role").add(IndexPrivilege.ALL, authorizedIndices).build()); + roleMap.put("dash", Role.builder("dash").add(IndexPrivilege.ALL, dashIndices).build()); + roleMap.put("test", Role.builder("test").cluster(ClusterPrivilege.MONITOR).build()); + roleMap.put(SuperuserRole.NAME, Role.builder(SuperuserRole.DESCRIPTOR).build()); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(roleMap.get(i.getArguments()[0])); + return Void.TYPE; + }).when(rolesStore).roles(any(String.class), any(ActionListener.class)); + ClusterService clusterService = mock(ClusterService.class); authzService = new AuthorizationService(settings, rolesStore, clusterService, mock(AuditTrailService.class), new DefaultAuthenticationFailureHandler(), mock(ThreadPool.class), @@ -1033,7 +1048,7 @@ public class IndicesAndAliasesResolverTests extends ESTestCase { public void testNonXPackUserAccessingSecurityIndex() { User allAccessUser = new User("all_access", "all_access"); - when(rolesStore.role("all_access")).thenReturn( + roleMap.put("all_access", Role.builder("all_access").add(IndexPrivilege.ALL, "*").cluster(ClusterPrivilege.ALL).build()); { @@ -1078,7 +1093,7 @@ public class IndicesAndAliasesResolverTests extends ESTestCase { // make the user authorized String dateTimeIndex = indexNameExpressionResolver.resolveDateMathExpression(""); String[] authorizedIndices = new String[] { "bar", "bar-closed", "foofoobar", "foofoo", "missing", "foofoo-closed", dateTimeIndex}; - when(rolesStore.role("role")).thenReturn(Role.builder("role").add(IndexPrivilege.ALL, authorizedIndices).build()); + roleMap.put("role", Role.builder("role").add(IndexPrivilege.ALL, authorizedIndices).build()); SearchRequest request = new SearchRequest(""); if (randomBoolean()) { @@ -1115,7 +1130,7 @@ public class IndicesAndAliasesResolverTests extends ESTestCase { // make the user authorized String[] authorizedIndices = new String[] { "bar", "bar-closed", "foofoobar", "foofoo", "missing", "foofoo-closed", indexNameExpressionResolver.resolveDateMathExpression("")}; - when(rolesStore.role("role")).thenReturn(Role.builder("role").add(IndexPrivilege.ALL, authorizedIndices).build()); + roleMap.put("role", Role.builder("role").add(IndexPrivilege.ALL, authorizedIndices).build()); GetAliasesRequest request = new GetAliasesRequest("").indices("foo", "foofoo"); Set indices = defaultIndicesResolver.resolve(request, metaData, buildAuthorizedIndices(user, GetAliasesAction.NAME)); //the union of all indices and aliases gets returned @@ -1129,8 +1144,9 @@ public class IndicesAndAliasesResolverTests extends ESTestCase { // TODO with the removal of DeleteByQuery is there another way to test resolving a write action? private AuthorizedIndices buildAuthorizedIndices(User user, String action) { - Collection roles = authzService.roles(user); - return new AuthorizedIndices(user, roles, action, metaData); + PlainActionFuture> rolesListener = new PlainActionFuture<>(); + authzService.roles(user, rolesListener); + return new AuthorizedIndices(user, rolesListener.actionGet(), action, metaData); } private static IndexMetaData.Builder indexBuilder(String index) { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java index f3759ad9ea5..27695c4d49b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.security.transport; import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.transport.TransportChannel; @@ -16,11 +18,19 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.xpack.security.authc.AuthenticationService; import org.elasticsearch.xpack.security.authz.AuthorizationService; +import org.elasticsearch.xpack.security.user.SystemUser; +import org.elasticsearch.xpack.security.user.User; +import org.elasticsearch.xpack.security.user.XPackUser; import org.junit.Before; +import java.util.Collection; +import java.util.Collections; + import static org.elasticsearch.xpack.security.support.Exceptions.authenticationError; import static org.elasticsearch.xpack.security.support.Exceptions.authorizationError; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -46,16 +56,21 @@ public class ServerTransportFilterTests extends ESTestCase { public void testInbound() throws Exception { TransportRequest request = mock(TransportRequest.class); Authentication authentication = mock(Authentication.class); + when(authentication.getUser()).thenReturn(SystemUser.INSTANCE); when(authcService.authenticate("_action", request, null)).thenReturn(authentication); - filter.inbound("_action", request, channel); - verify(authzService).authorize(authentication, "_action", request); + PlainActionFuture future = new PlainActionFuture(); + filter.inbound("_action", request, channel, future); + //future.get(); // don't block it's not called really just mocked + verify(authzService).authorize(authentication, "_action", request, Collections.emptyList(), Collections.emptyList()); } public void testInboundAuthenticationException() throws Exception { TransportRequest request = mock(TransportRequest.class); doThrow(authenticationError("authc failed")).when(authcService).authenticate("_action", request, null); try { - filter.inbound("_action", request, channel); + PlainActionFuture future = new PlainActionFuture(); + filter.inbound("_action", request, channel, future); + future.actionGet(); fail("expected filter inbound to throw an authentication exception on authentication error"); } catch (ElasticsearchSecurityException e) { assertThat(e.getMessage(), equalTo("authc failed")); @@ -67,9 +82,20 @@ public class ServerTransportFilterTests extends ESTestCase { TransportRequest request = mock(TransportRequest.class); Authentication authentication = mock(Authentication.class); when(authcService.authenticate("_action", request, null)).thenReturn(authentication); - doThrow(authorizationError("authz failed")).when(authzService).authorize(authentication, "_action", request); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(Collections.emptyList()); + return Void.TYPE; + }).when(authzService).roles(any(User.class), any(ActionListener.class)); + when(authentication.getUser()).thenReturn(XPackUser.INSTANCE); + when(authentication.getRunAsUser()).thenReturn(XPackUser.INSTANCE); + PlainActionFuture future = new PlainActionFuture(); + doThrow(authorizationError("authz failed")).when(authzService).authorize(authentication, "_action", request, + Collections.emptyList(), Collections.emptyList()); try { - filter.inbound("_action", request, channel); + filter.inbound("_action", request, channel, future); + future.actionGet(); fail("expected filter inbound to throw an authorization exception on authorization error"); } catch (ElasticsearchSecurityException e) { assertThat(e.getMessage(), equalTo("authz failed")); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/TransportFilterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/TransportFilterTests.java index 24cd9d22901..44014632f97 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/TransportFilterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/TransportFilterTests.java @@ -5,11 +5,11 @@ */ package org.elasticsearch.xpack.security.transport; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Binder; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -42,6 +42,7 @@ import org.elasticsearch.xpack.ssl.SSLService; import org.mockito.InOrder; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -52,8 +53,10 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -113,9 +116,11 @@ public class TransportFilterTests extends ESIntegTestCase { InOrder inOrder = inOrder(sourceAuth, targetServerFilter, targetAuth, sourceServerFilter); inOrder.verify(sourceAuth).attachUserIfMissing(SystemUser.INSTANCE); - inOrder.verify(targetServerFilter).inbound(eq("_action"), eq(new Request("src_to_trgt")), isA(TransportChannel.class)); + inOrder.verify(targetServerFilter).inbound(eq("_action"), eq(new Request("src_to_trgt")), isA(TransportChannel.class), + any(ActionListener.class)); inOrder.verify(targetAuth).attachUserIfMissing(SystemUser.INSTANCE); - inOrder.verify(sourceServerFilter).inbound(eq("_action"), eq(new Request("trgt_to_src")), isA(TransportChannel.class)); + inOrder.verify(sourceServerFilter).inbound(eq("_action"), eq(new Request("trgt_to_src")), isA(TransportChannel.class), + any(ActionListener.class)); } public static class InternalPlugin extends Plugin { @@ -288,6 +293,7 @@ public class TransportFilterTests extends ESIntegTestCase { public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, ResourceWatcherService resourceWatcherService, ScriptService scriptService, SearchRequestParsers searchRequestParsers) { + interceptor = new InternalPluginServerTransportServiceInterceptor(clusterService.getSettings(), threadPool, authenticationService, authorizationService); return Collections.emptyList(); @@ -295,12 +301,9 @@ public class TransportFilterTests extends ESIntegTestCase { @Override public Collection createGuiceModules() { - return Collections.singleton(new Module() { - @Override - public void configure(Binder binder) { - binder.bind(AuthenticationService.class).toInstance(authenticationService); - binder.bind(AuthorizationService.class).toInstance(authorizationService); - } + return Collections.singleton((Module) binder -> { + binder.bind(AuthenticationService.class).toInstance(authenticationService); + binder.bind(AuthorizationService.class).toInstance(authorizationService); }); } @@ -327,12 +330,29 @@ public class TransportFilterTests extends ESIntegTestCase { super(settings, threadPool,authenticationService, authorizationService, mock(XPackLicenseState.class), mock(SSLService.class)); when(licenseState.isAuthAllowed()).thenReturn(true); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[3]; + callback.onResponse(null); + return Void.TYPE; + }).when(authorizationService).roles(any(), any(ActionListener.class)); } @Override protected Map initializeProfileFilters() { + ServerTransportFilter.NodeProfile mock = mock(ServerTransportFilter.NodeProfile.class); + try { + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[3]; + callback.onResponse(null); + return Void.TYPE; + }).when(mock).inbound(any(), any(), any(), any(ActionListener.class)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } return Collections.singletonMap(TransportSettings.DEFAULT_PROFILE, - mock(ServerTransportFilter.NodeProfile.class)); + mock); } } }