From a50bc7946bd52032c4cd4c566bce8e834cf9a80d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 25 Oct 2016 17:28:29 +0200 Subject: [PATCH 1/2] Make request authorization non-blocking (elastic/elasticsearch#3837) This change removes the blocking notion from fetching the roles from a remote index. This also removes the blocking client calls that can potentially deadlock a request if executed on the transport thread. Relates to elastic/elasticsearch#3790 Original commit: elastic/x-pack-elasticsearch@c2eda3904394878911b2819924696986a82f771a --- .../xpack/common/GroupedActionListener.java | 69 ++++++ .../xpack/security/SecurityFeatureSet.java | 10 +- .../action/filter/SecurityActionFilter.java | 118 +++++------ .../security/authz/AuthorizationService.java | 32 +-- .../security/authz/AuthorizationUtils.java | 63 ++++++ .../authz/store/CompositeRolesStore.java | 23 +- .../security/authz/store/FileRolesStore.java | 4 +- .../authz/store/NativeRolesStore.java | 139 ++++++------ .../authz/store/ReservedRolesStore.java | 4 +- .../security/authz/store/RolesStore.java | 20 -- .../SecurityServerTransportInterceptor.java | 74 +++++-- .../transport/ServerTransportFilter.java | 25 ++- .../integration/ClearRolesCacheTests.java | 6 +- .../common/GroupedActionListenerTests.java | 107 ++++++++++ .../xpack/security/InternalClientTests.java | 3 - .../filter/SecurityActionFilterTests.java | 43 +++- .../authz/AuthorizationServiceTests.java | 199 ++++++++++-------- .../authz/IndicesAndAliasesResolverTests.java | 34 ++- .../transport/ServerTransportFilterTests.java | 36 +++- .../transport/TransportFilterTests.java | 40 +++- 20 files changed, 721 insertions(+), 328 deletions(-) create mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/common/GroupedActionListener.java delete mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/RolesStore.java create mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/common/GroupedActionListenerTests.java diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/common/GroupedActionListener.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/common/GroupedActionListener.java new file mode 100644 index 00000000000..f77ebf103ca --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/common/GroupedActionListener.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.common; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.xpack.security.support.Exceptions; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * An action listener that delegates it's results to another listener once + * it has received one or more failures or N results. This allows synchronous + * tasks to be forked off in a loop with the same listener and respond to a higher level listener once all tasks responded. + */ +public final class GroupedActionListener implements ActionListener { + private final CountDown countDown; + private final AtomicInteger pos = new AtomicInteger(); + private final AtomicArray roles; + private final ActionListener> delegate; + private final Collection defaults; + private final AtomicReference failure = new AtomicReference<>(); + + /** + * Creates a new listener + * @param delegate the delegate listener + * @param groupSize the group size + */ + public GroupedActionListener(ActionListener> delegate, int groupSize, Collection defaults) { + roles = new AtomicArray<>(groupSize); + countDown = new CountDown(groupSize); + this.delegate = delegate; + this.defaults = defaults; + } + + @Override + public void onResponse(T element) { + roles.set(pos.incrementAndGet() - 1, element); + if (countDown.countDown()) { + if (failure.get() != null) { + delegate.onFailure(failure.get()); + } else { + List collect = this.roles.asList().stream().map((e) + -> e.value).filter(r -> r != null).collect(Collectors.toList()); + collect.addAll(defaults); + delegate.onResponse(Collections.unmodifiableList(collect)); + } + } + } + + @Override + public void onFailure(Exception e) { + if (failure.compareAndSet(null, e) == false) { + failure.get().addSuppressed(e); + } + if (countDown.countDown()) { + delegate.onFailure(failure.get()); + } + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java index 61a21d2f84f..aa593e91237 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java @@ -23,7 +23,6 @@ import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.security.audit.AuditTrailService; import org.elasticsearch.xpack.security.authc.Realms; import org.elasticsearch.xpack.security.authz.store.CompositeRolesStore; -import org.elasticsearch.xpack.security.authz.store.RolesStore; import org.elasticsearch.xpack.security.crypto.CryptoService; import org.elasticsearch.xpack.security.transport.filter.IPFilter; import org.elasticsearch.xpack.security.user.AnonymousUser; @@ -89,7 +88,7 @@ public class SecurityFeatureSet implements XPackFeatureSet { @Override public XPackFeatureSet.Usage usage() { Map realmsUsage = buildRealmsUsage(realms); - Map rolesStoreUsage = rolesStoreUsage(rolesStore); + Map rolesStoreUsage = rolesStore == null ? Collections.emptyMap() : rolesStore.usageStats(); Map sslUsage = sslUsage(settings); Map auditUsage = auditUsage(auditTrailService); Map ipFilterUsage = ipFilterUsage(ipFilter); @@ -106,13 +105,6 @@ public class SecurityFeatureSet implements XPackFeatureSet { return realms.usageStats(); } - static Map rolesStoreUsage(@Nullable RolesStore rolesStore) { - if (rolesStore == null) { - return Collections.emptyMap(); - } - return rolesStore.usageStats(); - } - static Map sslUsage(Settings settings) { Map map = new HashMap<>(2); map.put("http", Collections.singletonMap("enabled", HTTP_SSL_ENABLED.get(settings))); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java index aa03f3ceb0e..2a1825f92f5 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.security.action.filter; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -19,7 +20,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilterChain; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -29,6 +29,7 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.common.ContextPreservingActionListener; import org.elasticsearch.xpack.security.SecurityContext; import org.elasticsearch.xpack.security.action.SecurityActionMapper; import org.elasticsearch.xpack.security.action.interceptor.RequestInterceptor; @@ -98,13 +99,33 @@ public class SecurityActionFilter extends AbstractComponent implements ActionFil final boolean restoreOriginalContext = securityContext.getAuthentication() != null; try { if (licenseState.isAuthAllowed()) { - if (AuthorizationUtils.shouldReplaceUserWithSystem(threadContext, action)) { + final boolean useSystemUser = AuthorizationUtils.shouldReplaceUserWithSystem(threadContext, action); + // we should always restore the original here because we forcefully changed to the system user + final ThreadContext.StoredContext toRestore = restoreOriginalContext || useSystemUser ? original : () -> {}; + final ActionListener signingListener = new ContextPreservingActionListener<>(toRestore, + ActionListener.wrap(r -> { + try { + listener.onResponse(sign(r)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, listener::onFailure)); + ActionListener authenticatedListener = new ActionListener() { + @Override + public void onResponse(Void aVoid) { + chain.proceed(task, action, request, signingListener); + } + @Override + public void onFailure(Exception e) { + signingListener.onFailure(e); + } + }; + if (useSystemUser) { try (ThreadContext.StoredContext ctx = threadContext.stashContext()) { - applyInternal(task, action, request, new SigningListener(this, listener, original), chain); + applyInternal(action, request, authenticatedListener); } } else { - applyInternal(task, action, request, - new SigningListener(this, listener, restoreOriginalContext ? original : null), chain); + applyInternal(action, request, authenticatedListener); } } else if (SECURITY_ACTION_MATCHER.test(action)) { throw LicenseUtils.newComplianceException(XPackPlugin.SECURITY); @@ -126,7 +147,7 @@ public class SecurityActionFilter extends AbstractComponent implements ActionFil return Integer.MIN_VALUE; } - private void applyInternal(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) + private void applyInternal(String action, final ActionRequest request, ActionListener listener) throws IOException { /** here we fallback on the system user. Internal system requests are requests that are triggered by @@ -141,35 +162,34 @@ public class SecurityActionFilter extends AbstractComponent implements ActionFil final String securityAction = actionMapper.action(action, request); Authentication authentication = authcService.authenticate(securityAction, request, SystemUser.INSTANCE); assert authentication != null; - authzService.authorize(authentication, securityAction, request); - final User user = authentication.getUser(); - request = unsign(user, securityAction, request); + final AuthorizationUtils.AsyncAuthorizer asyncAuthorizer = new AuthorizationUtils.AsyncAuthorizer(authentication, listener, + (userRoles, runAsRoles) -> { + authzService.authorize(authentication, securityAction, request, userRoles, runAsRoles); + final User user = authentication.getUser(); + unsign(user, securityAction, request); + + /* + * We use a separate concept for code that needs to be run after authentication and authorization that could effect the + * running of the action. This is done to make it more clear of the state of the request. + */ + for (RequestInterceptor interceptor : requestInterceptors) { + if (interceptor.supports(request)) { + interceptor.intercept(request, user); + } + } + listener.onResponse(null); + }); + asyncAuthorizer.authorize(authzService); - /* - * We use a separate concept for code that needs to be run after authentication and authorization that could effect the running of - * the action. This is done to make it more clear of the state of the request. - */ - for (RequestInterceptor interceptor : requestInterceptors) { - if (interceptor.supports(request)) { - interceptor.intercept(request, user); - } - } - // we should always restore the original here because we forcefully changed to the system user - chain.proceed(task, action, request, listener); } - Request unsign(User user, String action, Request request) { - + ActionRequest unsign(User user, String action, final ActionRequest request) { try { - if (request instanceof SearchScrollRequest) { SearchScrollRequest scrollRequest = (SearchScrollRequest) request; String scrollId = scrollRequest.scrollId(); scrollRequest.scrollId(cryptoService.unsignAndVerify(scrollId)); - return request; - } - - if (request instanceof ClearScrollRequest) { + } else if (request instanceof ClearScrollRequest) { ClearScrollRequest clearScrollRequest = (ClearScrollRequest) request; boolean isClearAllScrollRequest = clearScrollRequest.scrollIds().contains("_all"); if (!isClearAllScrollRequest) { @@ -180,64 +200,22 @@ public class SecurityActionFilter extends AbstractComponent implements ActionFil } clearScrollRequest.scrollIds(unsignedIds); } - return request; } - - return request; - } catch (IllegalArgumentException | IllegalStateException e) { auditTrail.tamperedRequest(user, action, request); throw authorizationError("invalid request. {}", e.getMessage()); } + return request; } Response sign(Response response) throws IOException { - if (response instanceof SearchResponse) { SearchResponse searchResponse = (SearchResponse) response; String scrollId = searchResponse.getScrollId(); if (scrollId != null && !cryptoService.isSigned(scrollId)) { searchResponse.scrollId(cryptoService.sign(scrollId)); } - return response; } - return response; } - - static class SigningListener implements ActionListener { - - private final SecurityActionFilter filter; - private final ActionListener innerListener; - private final ThreadContext.StoredContext threadContext; - - private SigningListener(SecurityActionFilter filter, ActionListener innerListener, - @Nullable ThreadContext.StoredContext threadContext) { - this.filter = filter; - this.innerListener = innerListener; - this.threadContext = threadContext; - } - - @Override - @SuppressWarnings("unchecked") - public void onResponse(Response response) { - if (threadContext != null) { - threadContext.restore(); - } - try { - response = this.filter.sign(response); - innerListener.onResponse(response); - } catch (IOException e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - if (threadContext != null) { - threadContext.restore(); - } - innerListener.onFailure(e); - } - } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index d108eed5aa3..3896b37612e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.security.authz; import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.alias.Alias; @@ -30,6 +31,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.xpack.common.GroupedActionListener; import org.elasticsearch.xpack.security.SecurityTemplateService; import org.elasticsearch.xpack.security.audit.AuditTrailService; import org.elasticsearch.xpack.security.authc.Authentication; @@ -49,7 +51,6 @@ import org.elasticsearch.xpack.security.user.SystemUser; import org.elasticsearch.xpack.security.user.User; import org.elasticsearch.xpack.security.user.XPackUser; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -105,7 +106,8 @@ public class AuthorizationService extends AbstractComponent { * @param request The request * @throws ElasticsearchSecurityException If the given user is no allowed to execute the given request */ - public void authorize(Authentication authentication, String action, TransportRequest request) throws ElasticsearchSecurityException { + public void authorize(Authentication authentication, String action, TransportRequest request, Collection userRoles, + Collection runAsRoles) throws ElasticsearchSecurityException { final TransportRequest originalRequest = request; if (request instanceof ConcreteShardRequest) { request = ((ConcreteShardRequest) request).getRequest(); @@ -122,9 +124,8 @@ public class AuthorizationService extends AbstractComponent { } throw denial(authentication, action, request); } - + Collection roles = userRoles; // get the roles of the authenticated user, which may be different than the effective - Collection roles = roles(authentication.getUser()); GlobalPermission permission = permission(roles); final boolean isRunAs = authentication.getUser() != authentication.getRunAsUser(); @@ -143,7 +144,7 @@ public class AuthorizationService extends AbstractComponent { RunAsPermission runAs = permission.runAs(); if (runAs != null && runAs.check(authentication.getRunAsUser().principal())) { grantRunAs(authentication, action, request); - roles = roles(authentication.getRunAsUser()); + roles = runAsRoles; permission = permission(roles); // permission can be empty as it might be that the run as user's role is unknown if (permission.isEmpty()) { @@ -280,7 +281,7 @@ public class AuthorizationService extends AbstractComponent { return rolesBuilder.build(); } - Collection roles(User user) { + public void roles(User user, ActionListener> roleActionListener) { // we need to special case the internal users in this method, if we apply the anonymous roles to every user including these system // user accounts then we run into the chance of a deadlock because then we need to get a role that we may be trying to get as the // internal user. The SystemUser is special cased as it has special privileges to execute internal actions and should never be @@ -291,7 +292,8 @@ public class AuthorizationService extends AbstractComponent { } if (XPackUser.is(user)) { assert XPackUser.INSTANCE.roles().length == 1 && SuperuserRole.NAME.equals(XPackUser.INSTANCE.roles()[0]); - return Collections.singleton(SuperuserRole.INSTANCE); + roleActionListener.onResponse(Collections.singleton(SuperuserRole.INSTANCE)); + return; } Set roleNames = new HashSet<>(); @@ -302,15 +304,17 @@ public class AuthorizationService extends AbstractComponent { } Collections.addAll(roleNames, anonymousUser.roles()); } - List roles = new ArrayList<>(); - roles.add(DefaultRole.INSTANCE); - for (String roleName : roleNames) { - Role role = rolesStore.role(roleName); - if (role != null) { - roles.add(role); + + final Collection defaultRoles = Collections.singletonList(DefaultRole.INSTANCE); + if (roleNames.isEmpty()) { + roleActionListener.onResponse(defaultRoles); + } else { + final GroupedActionListener listener = new GroupedActionListener<>(roleActionListener, roleNames.size(), + defaultRoles); + for (String roleName : roleNames) { + rolesStore.roles(roleName, listener); } } - return Collections.unmodifiableList(roles); } private static boolean isCompositeAction(String action) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java index 30e763a88f8..9d126f1121d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java @@ -5,12 +5,18 @@ */ package org.elasticsearch.xpack.security.authz; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.xpack.security.authc.Authentication; +import org.elasticsearch.xpack.security.authz.permission.Role; import org.elasticsearch.xpack.security.user.SystemUser; import org.elasticsearch.xpack.security.support.AutomatonPredicate; import org.elasticsearch.xpack.security.support.Automatons; +import java.util.Collection; +import java.util.Collections; +import java.util.function.BiConsumer; import java.util.function.Predicate; public final class AuthorizationUtils { @@ -59,4 +65,61 @@ public final class AuthorizationUtils { public static boolean isInternalAction(String action) { return INTERNAL_PREDICATE.test(action); } + + /** + * A base class to authorize authorize a given {@link Authentication} against it's users or run-as users roles. + * This class fetches the roles for the users asynchronously and then authenticates the in the callback. + */ + public static class AsyncAuthorizer { + + private final ActionListener listener; + private final BiConsumer, Collection> consumer; + private final Authentication authentication; + private volatile Collection userRoles; + private volatile Collection runAsRoles; + private CountDown countDown = new CountDown(2); // we expect only two responses!! + + public AsyncAuthorizer(Authentication authentication, ActionListener listener, BiConsumer, + Collection> consumer) { + this.consumer = consumer; + this.listener = listener; + this.authentication = authentication; + } + + public void authorize(AuthorizationService service) { + if (SystemUser.is(authentication.getUser())) { + setUserRoles(Collections.emptyList()); // we can inform the listener immediately - nothing to fetch for us on system user + setRunAsRoles(Collections.emptyList()); + } else { + service.roles(authentication.getUser(), ActionListener.wrap(this::setUserRoles, listener::onFailure)); + if (authentication.getUser().equals(authentication.getRunAsUser()) == false) { + assert authentication.getRunAsUser() != null : "runAs user is null but shouldn't"; + service.roles(authentication.getRunAsUser(), ActionListener.wrap(this::setRunAsRoles, listener::onFailure)); + } else { + setRunAsRoles(Collections.emptyList()); + } + } + } + + private void setUserRoles(Collection roles) { + this.userRoles = roles; + maybeRun(); + } + + private void setRunAsRoles(Collection roles) { + this.runAsRoles = roles; + maybeRun(); + } + + private void maybeRun() { + if (countDown.countDown()) { + try { + consumer.accept(userRoles, runAsRoles); + } catch (Exception e) { + listener.onFailure(e); + } + } + } + + } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java index 03c8a278891..2cc09de0722 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.security.authz.store; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.security.authz.permission.Role; @@ -16,7 +17,7 @@ import java.util.Map; * A composite roles store that combines built in roles, file-based roles, and index-based roles. Checks the built in roles first, then the * file roles, and finally the index roles. */ -public class CompositeRolesStore extends AbstractComponent implements RolesStore { +public class CompositeRolesStore extends AbstractComponent { private final FileRolesStore fileRolesStore; private final NativeRolesStore nativeRolesStore; @@ -30,7 +31,7 @@ public class CompositeRolesStore extends AbstractComponent implements RolesStore this.reservedRolesStore = reservedRolesStore; } - public Role role(String role) { + private Role getBuildInRole(String role) { // builtins first Role builtIn = reservedRolesStore.role(role); if (builtIn != null) { @@ -44,15 +45,19 @@ public class CompositeRolesStore extends AbstractComponent implements RolesStore logger.trace("loaded role [{}] from file roles store", role); return fileRole; } - - Role nativeRole = nativeRolesStore.role(role); - if (nativeRole != null) { - logger.trace("loaded role [{}] from native roles store", role); - } - return nativeRole; + return null; } - @Override + public void roles(String role, ActionListener roleActionListener) { + Role storedRole = getBuildInRole(role); + if (storedRole == null) { + nativeRolesStore.role(role, roleActionListener); + } else { + roleActionListener.onResponse(storedRole); + } + } + + public Map usageStats() { Map usage = new HashMap<>(2); usage.put("file", fileRolesStore.usageStats()); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/FileRolesStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/FileRolesStore.java index 136a40b35a0..bdcef269cdd 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/FileRolesStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/FileRolesStore.java @@ -43,7 +43,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableMap; -public class FileRolesStore extends AbstractLifecycleComponent implements RolesStore { +public class FileRolesStore extends AbstractLifecycleComponent { private static final Pattern IN_SEGMENT_LINE = Pattern.compile("^\\s+.+"); private static final Pattern SKIP_LINE = Pattern.compile("(^#.*|^\\s*)"); @@ -86,12 +86,10 @@ public class FileRolesStore extends AbstractLifecycleComponent implements RolesS protected void doClose() throws ElasticsearchException { } - @Override public Role role(String role) { return permissions.get(role); } - @Override public Map usageStats() { Map usageStats = new HashMap<>(); usageStats.put("size", permissions.size()); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java index de8be2db93d..5816024982b 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; @@ -25,6 +24,7 @@ import org.elasticsearch.action.search.MultiSearchResponse.Item; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -46,6 +46,7 @@ import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.security.SecurityTemplateService; import org.elasticsearch.xpack.security.action.role.ClearRolesCacheRequest; @@ -63,9 +64,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -83,7 +83,7 @@ import static org.elasticsearch.xpack.security.SecurityTemplateService.securityI * * No caching is done by this class, it is handled at a higher level */ -public class NativeRolesStore extends AbstractComponent implements RolesStore, ClusterStateListener { +public class NativeRolesStore extends AbstractComponent implements ClusterStateListener { public static final Setting SCROLL_SIZE_SETTING = Setting.intSetting(setting("authz.store.roles.index.scroll.size"), 1000, Property.NodeScope); @@ -127,6 +127,8 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C private SecurityClient securityClient; private int scrollSize; private TimeValue scrollKeepAlive; + // incremented each time the cache is invalidated + private final AtomicLong numInvalidation = new AtomicLong(0); private volatile boolean securityIndexExists = false; @@ -277,8 +279,17 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C logger.trace("attempted to get role [{}] before service was started", role); listener.onResponse(null); } - RoleAndVersion roleAndVersion = getRoleAndVersion(role); - listener.onResponse(roleAndVersion == null ? null : roleAndVersion.getRoleDescriptor()); + getRoleAndVersion(role, new ActionListener() { + @Override + public void onResponse(RoleAndVersion roleAndVersion) { + listener.onResponse(roleAndVersion == null ? null : roleAndVersion.getRoleDescriptor()); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } public void deleteRole(final DeleteRoleRequest deleteRoleRequest, final ActionListener listener) { @@ -352,16 +363,25 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C } - @Override - public Role role(String roleName) { + public void role(String roleName, ActionListener listener) { if (state() != State.STARTED) { - return null; + listener.onResponse(null); + } else { + getRoleAndVersion(roleName, new ActionListener() { + @Override + public void onResponse(RoleAndVersion roleAndVersion) { + listener.onResponse(roleAndVersion == null ? null : roleAndVersion.getRole()); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + + } + }); } - RoleAndVersion roleAndVersion = getRoleAndVersion(roleName); - return roleAndVersion == null ? null : roleAndVersion.getRole(); } - @Override public Map usageStats() { if (state() != State.STARTED) { return Collections.emptyMap(); @@ -445,70 +465,61 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C return usageStats; } - private RoleAndVersion getRoleAndVersion(final String roleId) { + private void getRoleAndVersion(final String roleId, ActionListener roleActionListener) { if (securityIndexExists == false) { - return null; - } - - RoleAndVersion roleAndVersion = null; - final AtomicReference getRef = new AtomicReference<>(null); - final CountDownLatch latch = new CountDownLatch(1); - try { - roleAndVersion = roleCache.computeIfAbsent(roleId, (key) -> { - logger.debug("attempting to load role [{}] from index", key); - executeGetRoleRequest(roleId, new LatchedActionListener<>(new ActionListener() { + roleActionListener.onResponse(null); + } else { + RoleAndVersion cachedRoleAndVersion = roleCache.get(roleId); + if (cachedRoleAndVersion == null) { + final long invalidationCounter = numInvalidation.get(); + executeGetRoleRequest(roleId, new ActionListener() { @Override - public void onResponse(GetResponse role) { - getRef.set(role); - } - - @Override - public void onFailure(Exception t) { - if (t instanceof IndexNotFoundException) { - logger.trace( - (Supplier) () -> new ParameterizedMessage( - "failed to retrieve role [{}] since security index does not exist", roleId), t); - } else { - logger.error((Supplier) () -> new ParameterizedMessage("failed to retrieve role [{}]", roleId), t); + public void onResponse(GetResponse response) { + RoleDescriptor descriptor = transformRole(response); + RoleAndVersion roleAndVersion = null; + if (descriptor != null) { + logger.debug("loaded role [{}] from index with version [{}]", roleId, response.getVersion()); + RoleAndVersion fetchedRoleAndVersion = new RoleAndVersion(descriptor, response.getVersion()); + roleAndVersion = fetchedRoleAndVersion; + if (fetchedRoleAndVersion != null) { + /* this is kinda spooky. We use a read/write lock to ensure we don't modify the cache if we hold the write + * lock (fetching stats for instance - which is kinda overkill?) but since we fetching stuff in an async + * fashion we need to make sure that if the cacht got invalidated since we started the request we don't + * put a potential stale result in the cache, hence the numInvalidation.get() comparison to the number of + * invalidation when we started. we just try to be on the safe side and don't cache potentially stale + * results*/ + try (final ReleasableLock ignored = readLock.acquire()) { + if (invalidationCounter == numInvalidation.get()) { + roleCache.computeIfAbsent(roleId, (k) -> fetchedRoleAndVersion); + } + } catch (ExecutionException e) { + throw new AssertionError("failed to load constant non-null value", e); + } + } else { + logger.trace("role [{}] was not found", roleId); + } } + roleActionListener.onResponse(roleAndVersion); } - }, latch)); - try { - latch.await(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - logger.error("timed out retrieving role [{}]", roleId); - } - - GetResponse response = getRef.get(); - if (response == null) { - return null; - } - - RoleDescriptor descriptor = transformRole(response); - if (descriptor == null) { - return null; - } - logger.debug("loaded role [{}] from index with version [{}]", key, response.getVersion()); - try (final ReleasableLock ignored = readLock.acquire()) { - return new RoleAndVersion(descriptor, response.getVersion()); - } - }); - } catch (ExecutionException e) { - if (e.getCause() instanceof NullPointerException) { - logger.trace((Supplier) () -> new ParameterizedMessage("role [{}] was not found", roleId), e); + @Override + public void onFailure(Exception e) { + logger.error((Supplier) () -> new ParameterizedMessage("failed to load role [{}]", roleId), e); + roleActionListener.onFailure(e); + } + }); } else { - logger.error((Supplier) () -> new ParameterizedMessage("failed to load role [{}]", roleId), e); + roleActionListener.onResponse(cachedRoleAndVersion); } } - - return roleAndVersion; } private void executeGetRoleRequest(String role, ActionListener listener) { try { GetRequest request = client.prepareGet(SecurityTemplateService.SECURITY_INDEX_NAME, ROLE_DOC_TYPE, role).request(); - client.get(request, listener); + // TODO we use a threaded listener here to make sure we don't execute on a transport thread. This can be removed once + // all blocking operations are removed from this and NativeUserStore + client.get(request, new ThreadedActionListener<>(logger, client.threadPool(), ThreadPool.Names.LISTENER, listener, true)); } catch (IndexNotFoundException e) { logger.trace( (Supplier) () -> new ParameterizedMessage( @@ -551,6 +562,7 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C public void invalidateAll() { logger.debug("invalidating all roles in cache"); + numInvalidation.incrementAndGet(); try (final ReleasableLock ignored = readLock.acquire()) { roleCache.invalidateAll(); } @@ -558,6 +570,7 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C public void invalidate(String role) { logger.debug("invalidating role [{}] in cache", role); + numInvalidation.incrementAndGet(); try (final ReleasableLock ignored = readLock.acquire()) { roleCache.invalidate(role); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/ReservedRolesStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/ReservedRolesStore.java index 4eaf675a492..b4079f189dc 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/ReservedRolesStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/ReservedRolesStore.java @@ -27,7 +27,7 @@ import org.elasticsearch.xpack.security.user.KibanaUser; import org.elasticsearch.xpack.security.user.SystemUser; import org.elasticsearch.xpack.security.user.User; -public class ReservedRolesStore implements RolesStore { +public class ReservedRolesStore { private static final User DEFAULT_ENABLED_KIBANA_USER = new KibanaUser(true); private final SecurityContext securityContext; @@ -36,7 +36,6 @@ public class ReservedRolesStore implements RolesStore { this.securityContext = securityContext; } - @Override public Role role(String role) { switch (role) { case SuperuserRole.NAME: @@ -67,7 +66,6 @@ public class ReservedRolesStore implements RolesStore { } } - @Override public Map usageStats() { return Collections.emptyMap(); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/RolesStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/RolesStore.java deleted file mode 100644 index eeb3ab8862b..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/store/RolesStore.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.security.authz.store; - -import org.elasticsearch.xpack.security.authz.permission.Role; - -import java.util.Map; - -/** - * An interface for looking up a role given a string role name - */ -public interface RolesStore { - - Role role(String role); - - Map usageStats(); -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java index bd22dc413dd..5c390dd8db3 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.security.transport; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -14,6 +15,7 @@ import org.elasticsearch.xpack.security.authz.AuthorizationService; import org.elasticsearch.xpack.security.authz.AuthorizationUtils; import org.elasticsearch.xpack.security.authz.accesscontrol.RequestContext; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.security.support.Exceptions; import org.elasticsearch.xpack.ssl.SSLService; import org.elasticsearch.xpack.security.transport.netty3.SecurityNetty3Transport; import org.elasticsearch.tasks.Task; @@ -29,9 +31,13 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.xpack.security.user.SystemUser; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.function.Consumer; import static org.elasticsearch.xpack.XPackSettings.TRANSPORT_SSL_ENABLED; import static org.elasticsearch.xpack.security.Security.setting; @@ -100,8 +106,8 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor @Override public TransportRequestHandler interceptHandler(String action, String executor, TransportRequestHandler actualHandler) { - return new ProfileSecuredRequestHandler<>(action, actualHandler, profileFilters, - licenseState, threadPool.getThreadContext()); + return new ProfileSecuredRequestHandler<>(action, executor, actualHandler, profileFilters, + licenseState, threadPool); } @@ -150,20 +156,45 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor private final Map profileFilters; private final XPackLicenseState licenseState; private final ThreadContext threadContext; + private final String executorName; + private final ThreadPool threadPool; - private ProfileSecuredRequestHandler(String action, TransportRequestHandler handler, + private ProfileSecuredRequestHandler(String action, String executorName, TransportRequestHandler handler, Map profileFilters, XPackLicenseState licenseState, - ThreadContext threadContext) { + ThreadPool threadPool) { this.action = action; + this.executorName = executorName; this.handler = handler; this.profileFilters = profileFilters; this.licenseState = licenseState; - this.threadContext = threadContext; + this.threadContext = threadPool.getThreadContext(); + this.threadPool = threadPool; } @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + final Consumer onFailure = (e) -> { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + }; + final Runnable receiveMessage = () -> { + // FIXME we should remove the RequestContext completely since we have ThreadContext but cannot yet due to + // the query cache + RequestContext context = new RequestContext(request, threadContext); + RequestContext.setCurrent(context); + try { + handler.messageReceived(request, channel, task); + } catch (Exception e) { + onFailure.accept(e); + } finally { + RequestContext.removeCurrent(); + } + }; try (ThreadContext.StoredContext ctx = threadContext.newStoredContext()) { + if (licenseState.isAuthAllowed()) { String profile = channel.getProfileName(); ServerTransportFilter filter = profileFilters.get(profile); @@ -178,16 +209,35 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor } } assert filter != null; - filter.inbound(action, request, channel); + final Thread executingThread = Thread.currentThread(); + Consumer consumer = (x) -> { + final Executor executor; + if (executingThread == Thread.currentThread()) { + // only fork off if we get called on another thread this means we moved to + // an async execution and in this case we need to go back to the thread pool + // that was actually executing it. it's also possible that the + // thread-pool we are supposed to execute on is `SAME` in that case + // the handler is OK with executing on a network thread and we can just continue even if + // we are on another thread due to async operations + executor = threadPool.executor(ThreadPool.Names.SAME); + } else { + executor = threadPool.executor(executorName); + } + + try { + executor.execute(receiveMessage); + } catch (Exception e) { + onFailure.accept(e); + } + + }; + ActionListener filterListener = ActionListener.wrap(consumer, onFailure); + filter.inbound(action, request, channel, filterListener); + } else { + receiveMessage.run(); } - // FIXME we should remove the RequestContext completely since we have ThreadContext but cannot yet due to the query cache - RequestContext context = new RequestContext(request, threadContext); - RequestContext.setCurrent(context); - handler.messageReceived(request, channel, task); } catch (Exception e) { channel.sendResponse(e); - } finally { - RequestContext.removeCurrent(); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java index cb09dfc43e9..500ce9e5b3c 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.security.transport; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.transport.DelegatingTransportChannel; @@ -19,6 +20,8 @@ import org.elasticsearch.xpack.security.authc.Authentication; import org.elasticsearch.xpack.security.authc.AuthenticationService; import org.elasticsearch.xpack.security.authc.pki.PkiRealm; import org.elasticsearch.xpack.security.authz.AuthorizationService; +import org.elasticsearch.xpack.security.authz.AuthorizationUtils; +import org.elasticsearch.xpack.security.authz.permission.Role; import org.jboss.netty.channel.Channel; import org.jboss.netty.handler.ssl.SslHandler; @@ -27,6 +30,7 @@ import javax.net.ssl.SSLPeerUnverifiedException; import java.io.IOException; import java.security.cert.Certificate; import java.security.cert.X509Certificate; +import java.util.Collection; import static org.elasticsearch.xpack.security.support.Exceptions.authenticationError; @@ -43,7 +47,8 @@ public interface ServerTransportFilter { * thrown by this method will stop the request from being handled and the error will * be sent back to the sender. */ - void inbound(String action, TransportRequest request, TransportChannel transportChannel) throws IOException; + void inbound(String action, TransportRequest request, TransportChannel transportChannel, ActionListener listener) + throws IOException; /** * The server trasnport filter that should be used in nodes as it ensures that an incoming @@ -67,7 +72,8 @@ public interface ServerTransportFilter { } @Override - public void inbound(String action, TransportRequest request, TransportChannel transportChannel) throws IOException { + public void inbound(String action, TransportRequest request, TransportChannel transportChannel, ActionListener listener) + throws IOException { /* here we don't have a fallback user, as all incoming request are expected to have a user attached (either in headers or in context) @@ -97,9 +103,13 @@ public interface ServerTransportFilter { } } } - - Authentication authentication = authcService.authenticate(securityAction, request, null); - authzService.authorize(authentication, securityAction, request); + final Authentication authentication = authcService.authenticate(securityAction, request, null); + final AuthorizationUtils.AsyncAuthorizer asyncAuthorizer = new AuthorizationUtils.AsyncAuthorizer(authentication, listener, + (userRoles, runAsRoles) -> { + authzService.authorize(authentication, securityAction, request, userRoles, runAsRoles); + listener.onResponse(null); + }); + asyncAuthorizer.authorize(authzService); } private void extactClientCertificates(SSLEngine sslEngine, Object channel) { @@ -138,13 +148,14 @@ public interface ServerTransportFilter { } @Override - public void inbound(String action, TransportRequest request, TransportChannel transportChannel) throws IOException { + public void inbound(String action, TransportRequest request, TransportChannel transportChannel, ActionListener listener) + throws IOException { // TODO is ']' sufficient to mark as shard action? boolean isInternalOrShardAction = action.startsWith("internal:") || action.endsWith("]"); if (isInternalOrShardAction) { throw authenticationError("executing internal/shard actions is considered malicious and forbidden"); } - super.inbound(action, request, transportChannel); + super.inbound(action, request, transportChannel, listener); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/integration/ClearRolesCacheTests.java b/elasticsearch/src/test/java/org/elasticsearch/integration/ClearRolesCacheTests.java index 7dc924702f4..a37109afd39 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/integration/ClearRolesCacheTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/integration/ClearRolesCacheTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.integration; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; @@ -14,6 +15,7 @@ import org.elasticsearch.xpack.security.action.role.DeleteRoleResponse; import org.elasticsearch.xpack.security.action.role.GetRolesResponse; import org.elasticsearch.xpack.security.action.role.PutRoleResponse; import org.elasticsearch.xpack.security.authz.permission.FieldPermissions; +import org.elasticsearch.xpack.security.authz.permission.Role; import org.elasticsearch.xpack.security.authz.store.NativeRolesStore; import org.elasticsearch.xpack.security.client.SecurityClient; import org.junit.Before; @@ -60,7 +62,9 @@ public class ClearRolesCacheTests extends NativeRealmIntegTestCase { // warm up the caches on every node for (NativeRolesStore rolesStore : internalCluster().getInstances(NativeRolesStore.class)) { for (String role : roles) { - assertThat(rolesStore.role(role), notNullValue()); + PlainActionFuture future = new PlainActionFuture<>(); + rolesStore.role(role, future); + assertThat(future.actionGet(), notNullValue()); } } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/common/GroupedActionListenerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/common/GroupedActionListenerTests.java new file mode 100644 index 00000000000..ae38f5b36fc --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/common/GroupedActionListenerTests.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.common; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class GroupedActionListenerTests extends ESTestCase { + + public void testNotifications() throws InterruptedException { + AtomicReference> resRef = new AtomicReference<>(); + ActionListener> result = new ActionListener>() { + @Override + public void onResponse(Collection integers) { + resRef.set(integers); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }; + final int groupSize = randomIntBetween(10, 1000); + AtomicInteger count = new AtomicInteger(); + Collection defaults = randomBoolean() ? Collections.singletonList(-1) : Collections.emptyList(); + GroupedActionListener listener = new GroupedActionListener<>(result, groupSize, defaults); + int numThreads = randomIntBetween(2, 5); + Thread[] threads = new Thread[numThreads]; + CyclicBarrier barrier = new CyclicBarrier(numThreads); + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread() { + @Override + public void run() { + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new AssertionError(e); + } + int c = 0; + while((c = count.incrementAndGet()) <= groupSize) { + listener.onResponse(c-1); + } + } + }; + threads[i].start(); + } + for (Thread t : threads) { + t.join(); + } + assertNotNull(resRef.get()); + ArrayList list = new ArrayList<>(resRef.get()); + Collections.sort(list); + int expectedSize = groupSize + defaults.size(); + assertEquals(expectedSize, resRef.get().size()); + int expectedValue = defaults.isEmpty() ? 0 : -1; + for (int i = 0; i < expectedSize; i++) { + assertEquals(Integer.valueOf(expectedValue++), list.get(i)); + } + } + + public void testFailed() { + AtomicReference> resRef = new AtomicReference<>(); + AtomicReference excRef = new AtomicReference<>(); + + ActionListener> result = new ActionListener>() { + @Override + public void onResponse(Collection integers) { + resRef.set(integers); + } + + @Override + public void onFailure(Exception e) { + excRef.set(e); + } + }; + Collection defaults = randomBoolean() ? Collections.singletonList(-1) : Collections.emptyList(); + int size = randomIntBetween(3, 4); + GroupedActionListener listener = new GroupedActionListener<>(result, size, defaults); + listener.onResponse(0); + IOException ioException = new IOException(); + RuntimeException rtException = new RuntimeException(); + listener.onFailure(rtException); + listener.onFailure(ioException); + if (size == 4) { + listener.onResponse(2); + } + assertNotNull(excRef.get()); + assertEquals(rtException, excRef.get()); + assertEquals(1, excRef.get().getSuppressed().length); + assertEquals(ioException, excRef.get().getSuppressed()[0]); + assertNull(resRef.get()); + listener.onResponse(1); + assertNull(resRef.get()); + } +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientTests.java index dcae61a7742..59470300949 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/InternalClientTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.security; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -17,7 +16,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.rest.yaml.section.Assertion; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.security.crypto.CryptoService; @@ -25,7 +23,6 @@ import org.elasticsearch.xpack.security.crypto.CryptoService; import java.io.IOException; import java.nio.file.Path; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class InternalClientTests extends ESTestCase { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilterTests.java index 1d314764236..8dff4dd5603 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilterTests.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.security.action.filter; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import org.elasticsearch.ElasticsearchSecurityException; @@ -17,8 +19,8 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.common.ContextPreservingActionListener; import org.elasticsearch.xpack.security.SecurityContext; -import org.elasticsearch.xpack.security.action.SecurityActionMapper; import org.elasticsearch.xpack.security.audit.AuditTrailService; import org.elasticsearch.xpack.security.authc.Authentication; import org.elasticsearch.xpack.security.authc.Authentication.RealmRef; @@ -31,8 +33,10 @@ import org.elasticsearch.license.XPackLicenseState; import org.junit.Before; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -72,11 +76,18 @@ public class SecurityActionFilterTests extends ESTestCase { Task task = mock(Task.class); User user = new User("username", "r1", "r2"); Authentication authentication = new Authentication(user, new RealmRef("test", "test", "foo"), null); + when(authcService.authenticate("_action", request, SystemUser.INSTANCE)).thenReturn(authentication); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(Collections.emptyList()); + return Void.TYPE; + }).when(authzService).roles(any(User.class), any(ActionListener.class)); doReturn(request).when(spy(filter)).unsign(user, "_action", request); filter.apply(task, "_action", request, listener, chain); - verify(authzService).authorize(authentication, "_action", request); - verify(chain).proceed(eq(task), eq("_action"), eq(request), isA(SecurityActionFilter.SigningListener.class)); + verify(authzService).authorize(authentication, "_action", request, Collections.emptyList(), Collections.emptyList()); + verify(chain).proceed(eq(task), eq("_action"), eq(request), isA(ContextPreservingActionListener.class)); } public void testActionProcessException() throws Exception { @@ -88,7 +99,14 @@ public class SecurityActionFilterTests extends ESTestCase { User user = new User("username", "r1", "r2"); Authentication authentication = new Authentication(user, new RealmRef("test", "test", "foo"), null); when(authcService.authenticate("_action", request, SystemUser.INSTANCE)).thenReturn(authentication); - doThrow(exception).when(authzService).authorize(authentication, "_action", request); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(Collections.emptyList()); + return Void.TYPE; + }).when(authzService).roles(any(User.class), any(ActionListener.class)); + doThrow(exception).when(authzService).authorize(eq(authentication), eq("_action"), eq(request), any(Collection.class), + any(Collection.class)); filter.apply(task, "_action", request, listener, chain); verify(listener).onFailure(exception); verifyNoMoreInteractions(chain); @@ -104,10 +122,17 @@ public class SecurityActionFilterTests extends ESTestCase { when(authcService.authenticate("_action", request, SystemUser.INSTANCE)).thenReturn(authentication); when(cryptoService.isSigned("signed_scroll_id")).thenReturn(true); when(cryptoService.unsignAndVerify("signed_scroll_id")).thenReturn("scroll_id"); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(Collections.emptyList()); + return Void.TYPE; + }).when(authzService).roles(any(User.class), any(ActionListener.class)); filter.apply(task, "_action", request, listener, chain); assertThat(request.scrollId(), equalTo("scroll_id")); - verify(authzService).authorize(authentication, "_action", request); - verify(chain).proceed(eq(task), eq("_action"), eq(request), isA(SecurityActionFilter.SigningListener.class)); + + verify(authzService).authorize(authentication, "_action", request, Collections.emptyList(), Collections.emptyList()); + verify(chain).proceed(eq(task), eq("_action"), eq(request), isA(ContextPreservingActionListener.class)); } public void testActionSignatureError() throws Exception { @@ -121,6 +146,12 @@ public class SecurityActionFilterTests extends ESTestCase { when(authcService.authenticate("_action", request, SystemUser.INSTANCE)).thenReturn(authentication); when(cryptoService.isSigned("scroll_id")).thenReturn(true); doThrow(sigException).when(cryptoService).unsignAndVerify("scroll_id"); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(Collections.emptyList()); + return Void.TYPE; + }).when(authzService).roles(any(User.class), any(ActionListener.class)); filter.apply(task, "_action", request, listener, chain); verify(listener).onFailure(isA(ElasticsearchSecurityException.class)); verify(auditTrail).tamperedRequest(user, "_action", request); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index 2a2b7cda677..c7bcf3c8881 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.security.authz; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; @@ -54,6 +55,7 @@ import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchTransportService; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.termvectors.MultiTermVectorsAction; import org.elasticsearch.action.termvectors.MultiTermVectorsRequest; import org.elasticsearch.action.termvectors.TermVectorsAction; @@ -95,6 +97,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -109,6 +112,9 @@ import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -118,11 +124,12 @@ import static org.mockito.Mockito.when; public class AuthorizationServiceTests extends ESTestCase { private AuditTrailService auditTrail; - private CompositeRolesStore rolesStore; private ClusterService clusterService; private AuthorizationService authorizationService; private ThreadContext threadContext; private ThreadPool threadPool; + private Map roleMap = new HashMap<>(); + private CompositeRolesStore rolesStore; @Before public void setup() { @@ -132,19 +139,34 @@ public class AuthorizationServiceTests extends ESTestCase { threadContext = new ThreadContext(Settings.EMPTY); threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(threadContext); - + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(roleMap.get((String)i.getArguments()[0])); + return Void.TYPE; + }).when(rolesStore).roles(any(String.class), any(ActionListener.class)); authorizationService = new AuthorizationService(Settings.EMPTY, rolesStore, clusterService, auditTrail, new DefaultAuthenticationFailureHandler(), threadPool, new AnonymousUser(Settings.EMPTY)); } + private void authorize(Authentication authentication, String action, TransportRequest request) { + PlainActionFuture future = new PlainActionFuture(); + AuthorizationUtils.AsyncAuthorizer authorizer = new AuthorizationUtils.AsyncAuthorizer(authentication, future, + (userRoles, runAsRoles) -> {authorizationService.authorize(authentication, action, request, userRoles, runAsRoles); + future.onResponse(null); + }); + authorizer.authorize(authorizationService); + future.actionGet(); + } + public void testActionsSystemUserIsAuthorized() { TransportRequest request = mock(TransportRequest.class); // A failure would throw an exception - authorizationService.authorize(createAuthentication(SystemUser.INSTANCE), "indices:monitor/whatever", request); + authorize(createAuthentication(SystemUser.INSTANCE), "indices:monitor/whatever", request); verify(auditTrail).accessGranted(SystemUser.INSTANCE, "indices:monitor/whatever", request); - authorizationService.authorize(createAuthentication(SystemUser.INSTANCE), "internal:whatever", request); + authorize(createAuthentication(SystemUser.INSTANCE), "internal:whatever", request); verify(auditTrail).accessGranted(SystemUser.INSTANCE, "internal:whatever", request); verifyNoMoreInteractions(auditTrail); } @@ -152,7 +174,7 @@ public class AuthorizationServiceTests extends ESTestCase { public void testIndicesActionsAreNotAuthorized() { TransportRequest request = mock(TransportRequest.class); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(SystemUser.INSTANCE), "indices:", request), + () -> authorize(createAuthentication(SystemUser.INSTANCE), "indices:", request), "indices:", SystemUser.INSTANCE.principal()); verify(auditTrail).accessDenied(SystemUser.INSTANCE, "indices:", request); verifyNoMoreInteractions(auditTrail); @@ -161,7 +183,7 @@ public class AuthorizationServiceTests extends ESTestCase { public void testClusterAdminActionsAreNotAuthorized() { TransportRequest request = mock(TransportRequest.class); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(SystemUser.INSTANCE), "cluster:admin/whatever", request), + () -> authorize(createAuthentication(SystemUser.INSTANCE), "cluster:admin/whatever", request), "cluster:admin/whatever", SystemUser.INSTANCE.principal()); verify(auditTrail).accessDenied(SystemUser.INSTANCE, "cluster:admin/whatever", request); verifyNoMoreInteractions(auditTrail); @@ -170,7 +192,7 @@ public class AuthorizationServiceTests extends ESTestCase { public void testClusterAdminSnapshotStatusActionIsNotAuthorized() { TransportRequest request = mock(TransportRequest.class); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(SystemUser.INSTANCE), "cluster:admin/snapshot/status", request), + () -> authorize(createAuthentication(SystemUser.INSTANCE), "cluster:admin/snapshot/status", request), "cluster:admin/snapshot/status", SystemUser.INSTANCE.principal()); verify(auditTrail).accessDenied(SystemUser.INSTANCE, "cluster:admin/snapshot/status", request); verifyNoMoreInteractions(auditTrail); @@ -181,7 +203,7 @@ public class AuthorizationServiceTests extends ESTestCase { User user = new User("test user"); mockEmptyMetaData(); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user"); verify(auditTrail).accessDenied(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -192,7 +214,7 @@ public class AuthorizationServiceTests extends ESTestCase { User user = new User("test user", "non-existent-role"); mockEmptyMetaData(); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user"); verify(auditTrail).accessDenied(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -201,10 +223,10 @@ public class AuthorizationServiceTests extends ESTestCase { public void testThatNonIndicesAndNonClusterActionIsDenied() { TransportRequest request = mock(TransportRequest.class); User user = new User("test user", "a_all"); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_role").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_role").add(IndexPrivilege.ALL, "a").build()); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), "whatever", request), + () -> authorize(createAuthentication(user), "whatever", request), "whatever", "test user"); verify(auditTrail).accessDenied(user, "whatever", request); verifyNoMoreInteractions(auditTrail); @@ -213,11 +235,11 @@ public class AuthorizationServiceTests extends ESTestCase { public void testThatRoleWithNoIndicesIsDenied() { TransportRequest request = new IndicesExistsRequest("a"); User user = new User("test user", "no_indices"); - when(rolesStore.role("no_indices")).thenReturn(Role.builder("no_indices").cluster(ClusterPrivilege.action("")).build()); + roleMap.put("no_indices", Role.builder("no_indices").cluster(ClusterPrivilege.action("")).build()); mockEmptyMetaData(); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user"); verify(auditTrail).accessDenied(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -225,7 +247,7 @@ public class AuthorizationServiceTests extends ESTestCase { public void testSearchAgainstEmptyCluster() { User user = new User("test user", "a_all"); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_role").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_role").add(IndexPrivilege.ALL, "a").build()); mockEmptyMetaData(); { @@ -234,7 +256,7 @@ public class AuthorizationServiceTests extends ESTestCase { .indicesOptions(IndicesOptions.fromOptions(false, true, true, false)); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), SearchAction.NAME, searchRequest), + () -> authorize(createAuthentication(user), SearchAction.NAME, searchRequest), SearchAction.NAME, "test user"); verify(auditTrail).accessDenied(user, SearchAction.NAME, searchRequest); verifyNoMoreInteractions(auditTrail); @@ -244,7 +266,7 @@ public class AuthorizationServiceTests extends ESTestCase { //ignore_unavailable and allow_no_indices both set to true, user is not authorized for this index nor does it exist SearchRequest searchRequest = new SearchRequest("does_not_exist") .indicesOptions(IndicesOptions.fromOptions(true, true, true, false)); - authorizationService.authorize(createAuthentication(user), SearchAction.NAME, searchRequest); + authorize(createAuthentication(user), SearchAction.NAME, searchRequest); verify(auditTrail).accessGranted(user, SearchAction.NAME, searchRequest); IndicesAccessControl indicesAccessControl = threadContext.getTransient(AuthorizationService.INDICES_PERMISSIONS_KEY); IndicesAccessControl.IndexAccessControl indexAccessControl = @@ -256,33 +278,32 @@ public class AuthorizationServiceTests extends ESTestCase { public void testScrollRelatedRequestsAllowed() { User user = new User("test user", "a_all"); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_role").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_role").add(IndexPrivilege.ALL, "a").build()); mockEmptyMetaData(); ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - authorizationService.authorize(createAuthentication(user), ClearScrollAction.NAME, clearScrollRequest); + authorize(createAuthentication(user), ClearScrollAction.NAME, clearScrollRequest); verify(auditTrail).accessGranted(user, ClearScrollAction.NAME, clearScrollRequest); SearchScrollRequest searchScrollRequest = new SearchScrollRequest(); - authorizationService.authorize(createAuthentication(user), SearchScrollAction.NAME, searchScrollRequest); + authorize(createAuthentication(user), SearchScrollAction.NAME, searchScrollRequest); verify(auditTrail).accessGranted(user, SearchScrollAction.NAME, searchScrollRequest); // We have to use a mock request for other Scroll actions as the actual requests are package private to SearchTransportService TransportRequest request = mock(TransportRequest.class); - authorizationService - .authorize(createAuthentication(user), SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME, request); + authorize(createAuthentication(user), SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME, request); verify(auditTrail).accessGranted(user, SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME, request); - authorizationService.authorize(createAuthentication(user), SearchTransportService.FETCH_ID_SCROLL_ACTION_NAME, request); + authorize(createAuthentication(user), SearchTransportService.FETCH_ID_SCROLL_ACTION_NAME, request); verify(auditTrail).accessGranted(user, SearchTransportService.FETCH_ID_SCROLL_ACTION_NAME, request); - authorizationService.authorize(createAuthentication(user), SearchTransportService.QUERY_FETCH_SCROLL_ACTION_NAME, request); + authorize(createAuthentication(user), SearchTransportService.QUERY_FETCH_SCROLL_ACTION_NAME, request); verify(auditTrail).accessGranted(user, SearchTransportService.QUERY_FETCH_SCROLL_ACTION_NAME, request); - authorizationService.authorize(createAuthentication(user), SearchTransportService.QUERY_SCROLL_ACTION_NAME, request); + authorize(createAuthentication(user), SearchTransportService.QUERY_SCROLL_ACTION_NAME, request); verify(auditTrail).accessGranted(user, SearchTransportService.QUERY_SCROLL_ACTION_NAME, request); - authorizationService.authorize(createAuthentication(user), SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME, request); + authorize(createAuthentication(user), SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME, request); verify(auditTrail).accessGranted(user, SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME, request); verifyNoMoreInteractions(auditTrail); } @@ -291,10 +312,10 @@ public class AuthorizationServiceTests extends ESTestCase { TransportRequest request = new GetIndexRequest().indices("b"); ClusterState state = mockEmptyMetaData(); User user = new User("test user", "a_all"); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user"); verify(auditTrail).accessDenied(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -307,10 +328,10 @@ public class AuthorizationServiceTests extends ESTestCase { request.alias(new Alias("a2")); ClusterState state = mockEmptyMetaData(); User user = new User("test user", "a_all"); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), CreateIndexAction.NAME, request), + () -> authorize(createAuthentication(user), CreateIndexAction.NAME, request), IndicesAliasesAction.NAME, "test user"); verify(auditTrail).accessDenied(user, IndicesAliasesAction.NAME, request); verifyNoMoreInteractions(auditTrail); @@ -323,9 +344,9 @@ public class AuthorizationServiceTests extends ESTestCase { request.alias(new Alias("a2")); ClusterState state = mockEmptyMetaData(); User user = new User("test user", "a_all"); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_all").add(IndexPrivilege.ALL, "a", "a2").build()); + roleMap.put("a_all", Role.builder("a_all").add(IndexPrivilege.ALL, "a", "a2").build()); - authorizationService.authorize(createAuthentication(user), CreateIndexAction.NAME, request); + authorize(createAuthentication(user), CreateIndexAction.NAME, request); verify(auditTrail).accessGranted(user, CreateIndexAction.NAME, request); verifyNoMoreInteractions(auditTrail); @@ -341,10 +362,10 @@ public class AuthorizationServiceTests extends ESTestCase { authorizationService = new AuthorizationService(settings, rolesStore, clusterService, auditTrail, new DefaultAuthenticationFailureHandler(), threadPool, anonymousUser); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(anonymousUser), "indices:a", request), + () -> authorize(createAuthentication(anonymousUser), "indices:a", request), "indices:a", anonymousUser.principal()); verify(auditTrail).accessDenied(anonymousUser, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -363,10 +384,10 @@ public class AuthorizationServiceTests extends ESTestCase { authorizationService = new AuthorizationService(settings, rolesStore, clusterService, auditTrail, new DefaultAuthenticationFailureHandler(), threadPool, new AnonymousUser(settings)); - when(rolesStore.role("a_all")).thenReturn(Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("a_all", Role.builder("a_all").add(IndexPrivilege.ALL, "a").build()); ElasticsearchSecurityException securityException = expectThrows(ElasticsearchSecurityException.class, - () -> authorizationService.authorize(createAuthentication(anonymousUser), "indices:a", request)); + () -> authorize(createAuthentication(anonymousUser), "indices:a", request)); assertAuthenticationException(securityException, containsString("action [indices:a] requires authentication")); verify(auditTrail).accessDenied(anonymousUser, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -379,7 +400,7 @@ public class AuthorizationServiceTests extends ESTestCase { User user = new User("test user", null, new User("run as me", new String[] { "admin" })); assertThat(user.runAs(), is(notNullValue())); assertThrowsAuthorizationExceptionRunAs( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user", "run as me"); // run as [run as me] verify(auditTrail).runAsDenied(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -389,14 +410,14 @@ public class AuthorizationServiceTests extends ESTestCase { TransportRequest request = mock(TransportRequest.class); User user = new User("test user", new String[] { "can run as" }, new User("run as me", "doesn't exist")); assertThat(user.runAs(), is(notNullValue())); - when(rolesStore.role("can run as")).thenReturn(Role + roleMap.put("can run as", Role .builder("can run as") .runAs(new GeneralPrivilege("", "not the right user")) .add(IndexPrivilege.ALL, "a") .build()); assertThrowsAuthorizationExceptionRunAs( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user", "run as me"); verify(auditTrail).runAsDenied(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -406,7 +427,7 @@ public class AuthorizationServiceTests extends ESTestCase { TransportRequest request = new GetIndexRequest().indices("a"); User user = new User("test user", new String[] { "can run as" }, new User("run as me", "b")); assertThat(user.runAs(), is(notNullValue())); - when(rolesStore.role("can run as")).thenReturn(Role + roleMap.put("can run as", Role .builder("can run as") .runAs(new GeneralPrivilege("", "run as me")) .add(IndexPrivilege.ALL, "a") @@ -420,7 +441,7 @@ public class AuthorizationServiceTests extends ESTestCase { .settings(Settings.builder().put("index.version.created", Version.CURRENT).build()) .numberOfShards(1).numberOfReplicas(0).build(), true) .build()); - when(rolesStore.role("b")).thenReturn(Role + roleMap.put("b", Role .builder("b") .add(IndexPrivilege.ALL, "b") .build()); @@ -429,7 +450,7 @@ public class AuthorizationServiceTests extends ESTestCase { } assertThrowsAuthorizationExceptionRunAs( - () -> authorizationService.authorize(createAuthentication(user), "indices:a", request), + () -> authorize(createAuthentication(user), "indices:a", request), "indices:a", "test user", "run as me"); verify(auditTrail).runAsGranted(user, "indices:a", request); verify(auditTrail).accessDenied(user, "indices:a", request); @@ -440,7 +461,7 @@ public class AuthorizationServiceTests extends ESTestCase { TransportRequest request = new GetIndexRequest().indices("b"); User user = new User("test user", new String[] { "can run as" }, new User("run as me", "b")); assertThat(user.runAs(), is(notNullValue())); - when(rolesStore.role("can run as")).thenReturn(Role + roleMap.put("can run as", Role .builder("can run as") .runAs(new GeneralPrivilege("", "run as me")) .add(IndexPrivilege.ALL, "a") @@ -452,12 +473,12 @@ public class AuthorizationServiceTests extends ESTestCase { .settings(Settings.builder().put("index.version.created", Version.CURRENT).build()) .numberOfShards(1).numberOfReplicas(0).build(), true) .build()); - when(rolesStore.role("b")).thenReturn(Role + roleMap.put("b", Role .builder("b") .add(IndexPrivilege.ALL, "b") .build()); - authorizationService.authorize(createAuthentication(user), "indices:a", request); + authorize(createAuthentication(user), "indices:a", request); verify(auditTrail).runAsGranted(user, "indices:a", request); verify(auditTrail).accessGranted(user, "indices:a", request); verifyNoMoreInteractions(auditTrail); @@ -465,7 +486,7 @@ public class AuthorizationServiceTests extends ESTestCase { public void testNonXPackUserCannotExecuteOperationAgainstSecurityIndex() { User user = new User("all_access_user", "all_access"); - when(rolesStore.role("all_access")).thenReturn(Role.builder("all_access") + roleMap.put("all_access", Role.builder("all_access") .add(IndexPrivilege.ALL, "*") .cluster(ClusterPrivilege.ALL) .build()); @@ -496,7 +517,7 @@ public class AuthorizationServiceTests extends ESTestCase { String action = requestTuple.v1(); TransportRequest request = requestTuple.v2(); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), action, request), + () -> authorize(createAuthentication(user), action, request), action, "all_access_user"); verify(auditTrail).accessDenied(user, action, request); verifyNoMoreInteractions(auditTrail); @@ -504,23 +525,23 @@ public class AuthorizationServiceTests extends ESTestCase { // we should allow waiting for the health of the index or any index if the user has this permission ClusterHealthRequest request = new ClusterHealthRequest(SecurityTemplateService.SECURITY_INDEX_NAME); - authorizationService.authorize(createAuthentication(user), ClusterHealthAction.NAME, request); + authorize(createAuthentication(user), ClusterHealthAction.NAME, request); verify(auditTrail).accessGranted(user, ClusterHealthAction.NAME, request); // multiple indices request = new ClusterHealthRequest(SecurityTemplateService.SECURITY_INDEX_NAME, "foo", "bar"); - authorizationService.authorize(createAuthentication(user), ClusterHealthAction.NAME, request); + authorize(createAuthentication(user), ClusterHealthAction.NAME, request); verify(auditTrail).accessGranted(user, ClusterHealthAction.NAME, request); SearchRequest searchRequest = new SearchRequest("_all"); - authorizationService.authorize(createAuthentication(user), SearchAction.NAME, searchRequest); + authorize(createAuthentication(user), SearchAction.NAME, searchRequest); assertEquals(2, searchRequest.indices().length); assertEquals(IndicesAndAliasesResolver.NO_INDICES_LIST, Arrays.asList(searchRequest.indices())); } public void testGrantedNonXPackUserCanExecuteMonitoringOperationsAgainstSecurityIndex() { User user = new User("all_access_user", "all_access"); - when(rolesStore.role("all_access")).thenReturn(Role.builder("all_access") + roleMap.put("all_access", Role.builder("all_access") .add(IndexPrivilege.ALL, "*") .cluster(ClusterPrivilege.ALL) .build()); @@ -546,14 +567,14 @@ public class AuthorizationServiceTests extends ESTestCase { for (Tuple requestTuple : requests) { String action = requestTuple.v1(); TransportRequest request = requestTuple.v2(); - authorizationService.authorize(createAuthentication(user), action, request); + authorize(createAuthentication(user), action, request); verify(auditTrail).accessGranted(user, action, request); } } public void testXPackUserAndSuperusersCanExecuteOperationAgainstSecurityIndex() { final User superuser = new User("custom_admin", SuperuserRole.NAME); - when(rolesStore.role(SuperuserRole.NAME)).thenReturn(Role.builder(SuperuserRole.DESCRIPTOR).build()); + roleMap.put(SuperuserRole.NAME, Role.builder(SuperuserRole.DESCRIPTOR).build()); ClusterState state = mock(ClusterState.class); when(clusterService.state()).thenReturn(state); when(state.metaData()).thenReturn(MetaData.builder() @@ -582,7 +603,7 @@ public class AuthorizationServiceTests extends ESTestCase { for (Tuple requestTuple : requests) { String action = requestTuple.v1(); TransportRequest request = requestTuple.v2(); - authorizationService.authorize(createAuthentication(user), action, request); + authorize(createAuthentication(user), action, request); verify(auditTrail).accessGranted(user, action, request); } } @@ -590,7 +611,7 @@ public class AuthorizationServiceTests extends ESTestCase { public void testXPackUserAndSuperusersCanExecuteOperationAgainstSecurityIndexWithWildcard() { final User superuser = new User("custom_admin", SuperuserRole.NAME); - when(rolesStore.role(SuperuserRole.NAME)).thenReturn(Role.builder(SuperuserRole.DESCRIPTOR).build()); + roleMap.put(SuperuserRole.NAME, Role.builder(SuperuserRole.DESCRIPTOR).build()); ClusterState state = mock(ClusterState.class); when(clusterService.state()).thenReturn(state); when(state.metaData()).thenReturn(MetaData.builder() @@ -601,12 +622,12 @@ public class AuthorizationServiceTests extends ESTestCase { String action = SearchAction.NAME; SearchRequest request = new SearchRequest("_all"); - authorizationService.authorize(createAuthentication(XPackUser.INSTANCE), action, request); + authorize(createAuthentication(XPackUser.INSTANCE), action, request); verify(auditTrail).accessGranted(XPackUser.INSTANCE, action, request); assertThat(request.indices(), arrayContaining(".security")); request = new SearchRequest("_all"); - authorizationService.authorize(createAuthentication(superuser), action, request); + authorize(createAuthentication(superuser), action, request); verify(auditTrail).accessGranted(superuser, action, request); assertThat(request.indices(), arrayContaining(".security")); } @@ -617,25 +638,26 @@ public class AuthorizationServiceTests extends ESTestCase { final AnonymousUser anonymousUser = new AnonymousUser(settings); authorizationService = new AuthorizationService(settings, rolesStore, clusterService, auditTrail, new DefaultAuthenticationFailureHandler(), threadPool, anonymousUser); - when(rolesStore.role("anonymous_user_role")) - .thenReturn(Role.builder("anonymous_user_role") + roleMap.put("anonymous_user_role", Role.builder("anonymous_user_role") .cluster(ClusterPrivilege.ALL) .add(IndexPrivilege.ALL, "a") .build()); mockEmptyMetaData(); // sanity check the anonymous user - authorizationService.authorize(createAuthentication(anonymousUser), ClusterHealthAction.NAME, request); - authorizationService.authorize(createAuthentication(anonymousUser), IndicesExistsAction.NAME, new IndicesExistsRequest("a")); + authorize(createAuthentication(anonymousUser), ClusterHealthAction.NAME, request); + authorize(createAuthentication(anonymousUser), IndicesExistsAction.NAME, new IndicesExistsRequest("a")); // test the no role user final User userWithNoRoles = new User("no role user"); - authorizationService.authorize(createAuthentication(userWithNoRoles), ClusterHealthAction.NAME, request); - authorizationService.authorize(createAuthentication(userWithNoRoles), IndicesExistsAction.NAME, new IndicesExistsRequest("a")); + authorize(createAuthentication(userWithNoRoles), ClusterHealthAction.NAME, request); + authorize(createAuthentication(userWithNoRoles), IndicesExistsAction.NAME, new IndicesExistsRequest("a")); } public void testDefaultRoleUserWithoutRoles() { - Collection roles = authorizationService.roles(new User("no role user")); + PlainActionFuture> rolesFuture = new PlainActionFuture<>(); + authorizationService.roles(new User("no role user"), rolesFuture); + final Collection roles = rolesFuture.actionGet(); assertEquals(1, roles.size()); assertEquals(DefaultRole.NAME, roles.iterator().next().name()); } @@ -645,13 +667,14 @@ public class AuthorizationServiceTests extends ESTestCase { final AnonymousUser anonymousUser = new AnonymousUser(settings); authorizationService = new AuthorizationService(settings, rolesStore, clusterService, auditTrail, new DefaultAuthenticationFailureHandler(), threadPool, anonymousUser); - when(rolesStore.role("anonymous_user_role")) - .thenReturn(Role.builder("anonymous_user_role") + roleMap.put("anonymous_user_role", Role.builder("anonymous_user_role") .cluster(ClusterPrivilege.ALL) .add(IndexPrivilege.ALL, "a") .build()); mockEmptyMetaData(); - Collection roles = authorizationService.roles(new User("no role user")); + PlainActionFuture> rolesFuture = new PlainActionFuture<>(); + authorizationService.roles(new User("no role user"), rolesFuture); + final Collection roles = rolesFuture.actionGet(); assertEquals(2, roles.size()); for (Role role : roles) { assertThat(role.name(), either(equalTo(DefaultRole.NAME)).or(equalTo("anonymous_user_role"))); @@ -659,12 +682,13 @@ public class AuthorizationServiceTests extends ESTestCase { } public void testDefaultRoleUserWithSomeRole() { - when(rolesStore.role("role")) - .thenReturn(Role.builder("role") + roleMap.put("role", Role.builder("role") .cluster(ClusterPrivilege.ALL) .add(IndexPrivilege.ALL, "a") .build()); - Collection roles = authorizationService.roles(new User("user with role", "role")); + PlainActionFuture> rolesFuture = new PlainActionFuture<>(); + authorizationService.roles(new User("user with role", "role"), rolesFuture); + final Collection roles = rolesFuture.actionGet(); assertEquals(2, roles.size()); for (Role role : roles) { assertThat(role.name(), either(equalTo(DefaultRole.NAME)).or(equalTo("role"))); @@ -677,9 +701,9 @@ public class AuthorizationServiceTests extends ESTestCase { String action = compositeRequest.v1(); TransportRequest request = compositeRequest.v2(); User user = new User("test user", "no_indices"); - when(rolesStore.role("no_indices")).thenReturn(Role.builder("no_indices").cluster(ClusterPrivilege.action("")).build()); + roleMap.put("no_indices", Role.builder("no_indices").cluster(ClusterPrivilege.action("")).build()); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(user), action, request), action, "test user"); + () -> authorize(createAuthentication(user), action, request), action, "test user"); verify(auditTrail).accessDenied(user, action, request); verifyNoMoreInteractions(auditTrail); } @@ -690,8 +714,9 @@ public class AuthorizationServiceTests extends ESTestCase { String action = compositeRequest.v1(); TransportRequest request = compositeRequest.v2(); User user = new User("test user", "role"); - when(rolesStore.role("role")).thenReturn(Role.builder("role").add(IndexPrivilege.ALL, randomBoolean() ? "a" : "index").build()); - authorizationService.authorize(createAuthentication(user), action, request); + roleMap.put("role", Role.builder("role").add(IndexPrivilege.ALL, + randomBoolean() ? "a" : "index").build()); + authorize(createAuthentication(user), action, request); verify(auditTrail).accessGranted(user, action, request); verifyNoMoreInteractions(auditTrail); } @@ -700,9 +725,10 @@ public class AuthorizationServiceTests extends ESTestCase { String action = randomCompositeRequest().v1(); TransportRequest request = mock(TransportRequest.class); User user = new User("test user", "role"); - when(rolesStore.role("role")).thenReturn(Role.builder("role").add(IndexPrivilege.ALL, randomBoolean() ? "a" : "index").build()); + roleMap.put("role", Role.builder("role").add(IndexPrivilege.ALL, + randomBoolean() ? "a" : "index").build()); IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, - () -> authorizationService.authorize(createAuthentication(user), action, request)); + () -> authorize(createAuthentication(user), action, request)); assertThat(illegalStateException.getMessage(), containsString("Composite actions must implement CompositeIndicesRequest")); } @@ -737,13 +763,13 @@ public class AuthorizationServiceTests extends ESTestCase { TransportRequest request = new MockIndicesRequest(); User userAllowed = new User("userAllowed", "roleAllowed"); - when(rolesStore.role("roleAllowed")).thenReturn(Role.builder("roleAllowed").add(IndexPrivilege.ALL, "index").build()); + roleMap.put("roleAllowed", Role.builder("roleAllowed").add(IndexPrivilege.ALL, "index").build()); User userDenied = new User("userDenied", "roleDenied"); - when(rolesStore.role("roleDenied")).thenReturn(Role.builder("roleDenied").add(IndexPrivilege.ALL, "a").build()); + roleMap.put("roleDenied", Role.builder("roleDenied").add(IndexPrivilege.ALL, "a").build()); mockEmptyMetaData(); - authorizationService.authorize(createAuthentication(userAllowed), action, request); + authorize(createAuthentication(userAllowed), action, request); assertThrowsAuthorizationException( - () -> authorizationService.authorize(createAuthentication(userDenied), action, request), action, "userDenied"); + () -> authorize(createAuthentication(userDenied), action, request), action, "userDenied"); } private static Tuple randomCompositeRequest() { @@ -787,7 +813,9 @@ public class AuthorizationServiceTests extends ESTestCase { } public void testDoesNotUseRolesStoreForXPackUser() { - Collection roles = authorizationService.roles(XPackUser.INSTANCE); + PlainActionFuture> rolesFuture = new PlainActionFuture<>(); + authorizationService.roles(XPackUser.INSTANCE, rolesFuture); + final Collection roles = rolesFuture.actionGet(); assertThat(roles, contains(SuperuserRole.INSTANCE)); verifyZeroInteractions(rolesStore); } @@ -800,7 +828,7 @@ public class AuthorizationServiceTests extends ESTestCase { final boolean roleExists = randomBoolean(); final Role anonymousRole = Role.builder("a_all").add(IndexPrivilege.ALL, "a").build(); if (roleExists) { - when(rolesStore.role("a_all")).thenReturn(anonymousRole); + roleMap.put("a_all", anonymousRole); } final MetaData metaData = MetaData.builder() .put(new IndexMetaData.Builder("a") @@ -809,9 +837,11 @@ public class AuthorizationServiceTests extends ESTestCase { .build(); User user = new User("no_roles"); - final Collection roles = authorizationService.roles(user); + PlainActionFuture> rolesFuture = new PlainActionFuture<>(); + authorizationService.roles(user, rolesFuture); + final Collection roles = rolesFuture.actionGet(); GlobalPermission globalPermission = authorizationService.permission(roles); - verify(rolesStore).role("a_all"); + verify(rolesStore).roles(eq("a_all"), any(ActionListener.class)); if (roleExists) { assertThat(roles, containsInAnyOrder(anonymousRole, DefaultRole.INSTANCE)); @@ -835,7 +865,8 @@ public class AuthorizationServiceTests extends ESTestCase { } public void testGetRolesForSystemUserThrowsException() { - IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> authorizationService.roles(SystemUser.INSTANCE)); + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> authorizationService.roles(SystemUser.INSTANCE, + null)); assertEquals("the user [_system] is the system user and we should never try to get its roles", iae.getMessage()); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java index 1160d16be71..12be38a9744 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.security.authz; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; @@ -23,6 +24,7 @@ import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.termvectors.MultiTermVectorsRequest; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -51,6 +53,8 @@ import org.junit.Before; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; @@ -58,6 +62,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.not; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -71,6 +77,7 @@ public class IndicesAndAliasesResolverTests extends ESTestCase { private AuthorizationService authzService; private IndicesAndAliasesResolver defaultIndicesResolver; private IndexNameExpressionResolver indexNameExpressionResolver; + private Map roleMap; @Before public void setup() { @@ -105,10 +112,18 @@ public class IndicesAndAliasesResolverTests extends ESTestCase { rolesStore = mock(CompositeRolesStore.class); String[] authorizedIndices = new String[] { "bar", "bar-closed", "foofoobar", "foofoo", "missing", "foofoo-closed"}; String[] dashIndices = new String[]{"-index10", "-index11", "-index20", "-index21"}; - when(rolesStore.role("role")).thenReturn(Role.builder("role").add(IndexPrivilege.ALL, authorizedIndices).build()); - when(rolesStore.role("dash")).thenReturn(Role.builder("dash").add(IndexPrivilege.ALL, dashIndices).build()); - when(rolesStore.role("test")).thenReturn(Role.builder("test").cluster(ClusterPrivilege.MONITOR).build()); - when(rolesStore.role(SuperuserRole.NAME)).thenReturn(Role.builder(SuperuserRole.DESCRIPTOR).build()); + roleMap = new HashMap<>(); + roleMap.put("role", Role.builder("role").add(IndexPrivilege.ALL, authorizedIndices).build()); + roleMap.put("dash", Role.builder("dash").add(IndexPrivilege.ALL, dashIndices).build()); + roleMap.put("test", Role.builder("test").cluster(ClusterPrivilege.MONITOR).build()); + roleMap.put(SuperuserRole.NAME, Role.builder(SuperuserRole.DESCRIPTOR).build()); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(roleMap.get(i.getArguments()[0])); + return Void.TYPE; + }).when(rolesStore).roles(any(String.class), any(ActionListener.class)); + ClusterService clusterService = mock(ClusterService.class); authzService = new AuthorizationService(settings, rolesStore, clusterService, mock(AuditTrailService.class), new DefaultAuthenticationFailureHandler(), mock(ThreadPool.class), @@ -1033,7 +1048,7 @@ public class IndicesAndAliasesResolverTests extends ESTestCase { public void testNonXPackUserAccessingSecurityIndex() { User allAccessUser = new User("all_access", "all_access"); - when(rolesStore.role("all_access")).thenReturn( + roleMap.put("all_access", Role.builder("all_access").add(IndexPrivilege.ALL, "*").cluster(ClusterPrivilege.ALL).build()); { @@ -1078,7 +1093,7 @@ public class IndicesAndAliasesResolverTests extends ESTestCase { // make the user authorized String dateTimeIndex = indexNameExpressionResolver.resolveDateMathExpression(""); String[] authorizedIndices = new String[] { "bar", "bar-closed", "foofoobar", "foofoo", "missing", "foofoo-closed", dateTimeIndex}; - when(rolesStore.role("role")).thenReturn(Role.builder("role").add(IndexPrivilege.ALL, authorizedIndices).build()); + roleMap.put("role", Role.builder("role").add(IndexPrivilege.ALL, authorizedIndices).build()); SearchRequest request = new SearchRequest(""); if (randomBoolean()) { @@ -1115,7 +1130,7 @@ public class IndicesAndAliasesResolverTests extends ESTestCase { // make the user authorized String[] authorizedIndices = new String[] { "bar", "bar-closed", "foofoobar", "foofoo", "missing", "foofoo-closed", indexNameExpressionResolver.resolveDateMathExpression("")}; - when(rolesStore.role("role")).thenReturn(Role.builder("role").add(IndexPrivilege.ALL, authorizedIndices).build()); + roleMap.put("role", Role.builder("role").add(IndexPrivilege.ALL, authorizedIndices).build()); GetAliasesRequest request = new GetAliasesRequest("").indices("foo", "foofoo"); Set indices = defaultIndicesResolver.resolve(request, metaData, buildAuthorizedIndices(user, GetAliasesAction.NAME)); //the union of all indices and aliases gets returned @@ -1129,8 +1144,9 @@ public class IndicesAndAliasesResolverTests extends ESTestCase { // TODO with the removal of DeleteByQuery is there another way to test resolving a write action? private AuthorizedIndices buildAuthorizedIndices(User user, String action) { - Collection roles = authzService.roles(user); - return new AuthorizedIndices(user, roles, action, metaData); + PlainActionFuture> rolesListener = new PlainActionFuture<>(); + authzService.roles(user, rolesListener); + return new AuthorizedIndices(user, rolesListener.actionGet(), action, metaData); } private static IndexMetaData.Builder indexBuilder(String index) { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java index f3759ad9ea5..27695c4d49b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.security.transport; import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.transport.TransportChannel; @@ -16,11 +18,19 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.xpack.security.authc.AuthenticationService; import org.elasticsearch.xpack.security.authz.AuthorizationService; +import org.elasticsearch.xpack.security.user.SystemUser; +import org.elasticsearch.xpack.security.user.User; +import org.elasticsearch.xpack.security.user.XPackUser; import org.junit.Before; +import java.util.Collection; +import java.util.Collections; + import static org.elasticsearch.xpack.security.support.Exceptions.authenticationError; import static org.elasticsearch.xpack.security.support.Exceptions.authorizationError; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -46,16 +56,21 @@ public class ServerTransportFilterTests extends ESTestCase { public void testInbound() throws Exception { TransportRequest request = mock(TransportRequest.class); Authentication authentication = mock(Authentication.class); + when(authentication.getUser()).thenReturn(SystemUser.INSTANCE); when(authcService.authenticate("_action", request, null)).thenReturn(authentication); - filter.inbound("_action", request, channel); - verify(authzService).authorize(authentication, "_action", request); + PlainActionFuture future = new PlainActionFuture(); + filter.inbound("_action", request, channel, future); + //future.get(); // don't block it's not called really just mocked + verify(authzService).authorize(authentication, "_action", request, Collections.emptyList(), Collections.emptyList()); } public void testInboundAuthenticationException() throws Exception { TransportRequest request = mock(TransportRequest.class); doThrow(authenticationError("authc failed")).when(authcService).authenticate("_action", request, null); try { - filter.inbound("_action", request, channel); + PlainActionFuture future = new PlainActionFuture(); + filter.inbound("_action", request, channel, future); + future.actionGet(); fail("expected filter inbound to throw an authentication exception on authentication error"); } catch (ElasticsearchSecurityException e) { assertThat(e.getMessage(), equalTo("authc failed")); @@ -67,9 +82,20 @@ public class ServerTransportFilterTests extends ESTestCase { TransportRequest request = mock(TransportRequest.class); Authentication authentication = mock(Authentication.class); when(authcService.authenticate("_action", request, null)).thenReturn(authentication); - doThrow(authorizationError("authz failed")).when(authzService).authorize(authentication, "_action", request); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(Collections.emptyList()); + return Void.TYPE; + }).when(authzService).roles(any(User.class), any(ActionListener.class)); + when(authentication.getUser()).thenReturn(XPackUser.INSTANCE); + when(authentication.getRunAsUser()).thenReturn(XPackUser.INSTANCE); + PlainActionFuture future = new PlainActionFuture(); + doThrow(authorizationError("authz failed")).when(authzService).authorize(authentication, "_action", request, + Collections.emptyList(), Collections.emptyList()); try { - filter.inbound("_action", request, channel); + filter.inbound("_action", request, channel, future); + future.actionGet(); fail("expected filter inbound to throw an authorization exception on authorization error"); } catch (ElasticsearchSecurityException e) { assertThat(e.getMessage(), equalTo("authz failed")); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/TransportFilterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/TransportFilterTests.java index 24cd9d22901..44014632f97 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/TransportFilterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/TransportFilterTests.java @@ -5,11 +5,11 @@ */ package org.elasticsearch.xpack.security.transport; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Binder; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -42,6 +42,7 @@ import org.elasticsearch.xpack.ssl.SSLService; import org.mockito.InOrder; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -52,8 +53,10 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -113,9 +116,11 @@ public class TransportFilterTests extends ESIntegTestCase { InOrder inOrder = inOrder(sourceAuth, targetServerFilter, targetAuth, sourceServerFilter); inOrder.verify(sourceAuth).attachUserIfMissing(SystemUser.INSTANCE); - inOrder.verify(targetServerFilter).inbound(eq("_action"), eq(new Request("src_to_trgt")), isA(TransportChannel.class)); + inOrder.verify(targetServerFilter).inbound(eq("_action"), eq(new Request("src_to_trgt")), isA(TransportChannel.class), + any(ActionListener.class)); inOrder.verify(targetAuth).attachUserIfMissing(SystemUser.INSTANCE); - inOrder.verify(sourceServerFilter).inbound(eq("_action"), eq(new Request("trgt_to_src")), isA(TransportChannel.class)); + inOrder.verify(sourceServerFilter).inbound(eq("_action"), eq(new Request("trgt_to_src")), isA(TransportChannel.class), + any(ActionListener.class)); } public static class InternalPlugin extends Plugin { @@ -288,6 +293,7 @@ public class TransportFilterTests extends ESIntegTestCase { public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, ResourceWatcherService resourceWatcherService, ScriptService scriptService, SearchRequestParsers searchRequestParsers) { + interceptor = new InternalPluginServerTransportServiceInterceptor(clusterService.getSettings(), threadPool, authenticationService, authorizationService); return Collections.emptyList(); @@ -295,12 +301,9 @@ public class TransportFilterTests extends ESIntegTestCase { @Override public Collection createGuiceModules() { - return Collections.singleton(new Module() { - @Override - public void configure(Binder binder) { - binder.bind(AuthenticationService.class).toInstance(authenticationService); - binder.bind(AuthorizationService.class).toInstance(authorizationService); - } + return Collections.singleton((Module) binder -> { + binder.bind(AuthenticationService.class).toInstance(authenticationService); + binder.bind(AuthorizationService.class).toInstance(authorizationService); }); } @@ -327,12 +330,29 @@ public class TransportFilterTests extends ESIntegTestCase { super(settings, threadPool,authenticationService, authorizationService, mock(XPackLicenseState.class), mock(SSLService.class)); when(licenseState.isAuthAllowed()).thenReturn(true); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[3]; + callback.onResponse(null); + return Void.TYPE; + }).when(authorizationService).roles(any(), any(ActionListener.class)); } @Override protected Map initializeProfileFilters() { + ServerTransportFilter.NodeProfile mock = mock(ServerTransportFilter.NodeProfile.class); + try { + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[3]; + callback.onResponse(null); + return Void.TYPE; + }).when(mock).inbound(any(), any(), any(), any(ActionListener.class)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } return Collections.singletonMap(TransportSettings.DEFAULT_PROFILE, - mock(ServerTransportFilter.NodeProfile.class)); + mock); } } } From 7d60f6b365067e6c75132725ce772743cdd6c155 Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Tue, 25 Oct 2016 13:48:28 -0400 Subject: [PATCH 2/2] security: restore the correct user when switching to the system user * security: restore the correct user when switching to the system user For internal actions where we need to switch to the SystemUser, we should always restore the proper context after execution. We were restoring an empty context for actions executed by the SystemUser in the SecurityServerTransportInterceptor. In order to accomplish this, a few changes have been made. Both the SecurityServerTransportInterceptor and the SecurityActionFilter delegate to `SecurityContext#executeAsUser` when a user switch is necessary. Tests were added for this method to ensure that the consumer is executed as the correct user and the proper user is restored. While working on this, a few other cleanups were made: * SecurityContext can never have a null CryptoService, so a null check was removed * We no longer replace the user with the system user when the system user is already associated with the request * The security transport interceptor checks the license state and if auth is not allowed, delegate and return * The security transport interceptor sendWithUser method now requires authentication to be present or a hard exception is thrown. * The TransportFilters integration test has been deleted. This was integration test that relied on the ability to get instances from a node and trace the execution. This has been replaced by additional unit tests in ServerTransportFilterTests Closes elastic/elasticsearch#3845 Original commit: elastic/x-pack-elasticsearch@d8bcb59cb757e3c79481e5b8e1da764dd210ee35 --- .../license/XPackLicenseState.java | 4 - .../xpack/security/Security.java | 2 +- .../xpack/security/SecurityContext.java | 43 ++- .../action/filter/SecurityActionFilter.java | 81 ++-- .../security/authz/AuthorizationService.java | 2 +- .../security/authz/AuthorizationUtils.java | 9 +- .../SecurityServerTransportInterceptor.java | 40 +- .../transport/ServerTransportFilter.java | 2 +- .../xpack/security/SecurityContextTests.java | 100 +++++ .../authz/AuthorizationUtilsTests.java | 12 +- ...curityServerTransportInterceptorTests.java | 211 +++++++++++ .../transport/ServerTransportFilterTests.java | 66 +++- .../transport/TransportFilterTests.java | 358 ------------------ 13 files changed, 494 insertions(+), 436 deletions(-) create mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/security/SecurityContextTests.java create mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java delete mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/TransportFilterTests.java diff --git a/elasticsearch/src/main/java/org/elasticsearch/license/XPackLicenseState.java b/elasticsearch/src/main/java/org/elasticsearch/license/XPackLicenseState.java index a852c98dabc..ebfba1b19ad 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/license/XPackLicenseState.java +++ b/elasticsearch/src/main/java/org/elasticsearch/license/XPackLicenseState.java @@ -15,11 +15,7 @@ import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.License.OperationMode; import org.elasticsearch.xpack.XPackPlugin; -import org.elasticsearch.xpack.graph.Graph; -import org.elasticsearch.xpack.monitoring.Monitoring; import org.elasticsearch.xpack.monitoring.MonitoringSettings; -import org.elasticsearch.xpack.security.Security; -import org.elasticsearch.xpack.watcher.Watcher; /** * A holder for the current state of the license for all xpack features. diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/Security.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/Security.java index c36cc323c7d..f7857a92189 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -334,7 +334,7 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin { ipFilter.set(new IPFilter(settings, auditTrailService, clusterService.getClusterSettings(), licenseState)); components.add(ipFilter.get()); securityIntercepter.set(new SecurityServerTransportInterceptor(settings, threadPool, authcService, authzService, licenseState, - sslService)); + sslService, securityContext)); return components; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/SecurityContext.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/SecurityContext.java index a4d9e177bdd..d47af3d63d9 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/SecurityContext.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/SecurityContext.java @@ -9,6 +9,8 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; +import org.elasticsearch.node.Node; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.security.authc.Authentication; import org.elasticsearch.xpack.security.authc.AuthenticationService; @@ -16,6 +18,8 @@ import org.elasticsearch.xpack.security.crypto.CryptoService; import org.elasticsearch.xpack.security.user.User; import java.io.IOException; +import java.util.Objects; +import java.util.function.Consumer; /** * A lightweight utility that can find the current user and authentication information for the local thread. @@ -26,6 +30,7 @@ public class SecurityContext { private final ThreadContext threadContext; private final CryptoService cryptoService; private final boolean signUserHeader; + private final String nodeName; /** * Creates a new security context. @@ -37,6 +42,7 @@ public class SecurityContext { this.threadContext = threadPool.getThreadContext(); this.cryptoService = cryptoService; this.signUserHeader = AuthenticationService.SIGN_USER_HEADER.get(settings); + this.nodeName = Node.NODE_NAME_SETTING.get(settings); } /** Returns the current user information, or null if the current request has no authentication info. */ @@ -47,9 +53,6 @@ public class SecurityContext { /** Returns the authentication information, or null if the current request has no authentication info. */ public Authentication getAuthentication() { - if (cryptoService == null) { - return null; - } try { return Authentication.readFromContext(threadContext, cryptoService, signUserHeader); } catch (IOException e) { @@ -59,4 +62,38 @@ public class SecurityContext { return null; } } + + /** + * Sets the user forcefully to the provided user. There must not be an existing user in the ThreadContext otherwise an exception + * will be thrown. This method is package private for testing. + */ + void setUser(User user) { + Objects.requireNonNull(user); + final Authentication.RealmRef lookedUpBy; + if (user.runAs() == null) { + lookedUpBy = null; + } else { + lookedUpBy = new Authentication.RealmRef("__attach", "__attach", nodeName); + } + + try { + Authentication authentication = + new Authentication(user, new Authentication.RealmRef("__attach", "__attach", nodeName), lookedUpBy); + authentication.writeToContext(threadContext, cryptoService, signUserHeader); + } catch (IOException e) { + throw new AssertionError("how can we have a IOException with a user we set", e); + } + } + + /** + * Runs the consumer in a new context as the provided user. The original constext is provided to the consumer. When this method + * returns, the original context is restored. + */ + public void executeAsUser(User user, Consumer consumer) { + final StoredContext original = threadContext.newStoredContext(); + try (ThreadContext.StoredContext ctx = threadContext.stashContext()) { + setUser(user); + consumer.accept(original); + } + } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java index 2a1825f92f5..f7039716de3 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java @@ -93,45 +93,51 @@ public class SecurityActionFilter extends AbstractComponent implements ActionFil throw LicenseUtils.newComplianceException(XPackPlugin.SECURITY); } - // only restore the context if it is not empty. This is needed because sometimes a response is sent to the user - // and then a cleanup action is executed (like for search without a scroll) - final ThreadContext.StoredContext original = threadContext.newStoredContext(); - final boolean restoreOriginalContext = securityContext.getAuthentication() != null; - try { - if (licenseState.isAuthAllowed()) { - final boolean useSystemUser = AuthorizationUtils.shouldReplaceUserWithSystem(threadContext, action); - // we should always restore the original here because we forcefully changed to the system user - final ThreadContext.StoredContext toRestore = restoreOriginalContext || useSystemUser ? original : () -> {}; - final ActionListener signingListener = new ContextPreservingActionListener<>(toRestore, - ActionListener.wrap(r -> { - try { - listener.onResponse(sign(r)); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }, listener::onFailure)); - ActionListener authenticatedListener = new ActionListener() { - @Override - public void onResponse(Void aVoid) { - chain.proceed(task, action, request, signingListener); - } - @Override - public void onFailure(Exception e) { - signingListener.onFailure(e); - } - }; - if (useSystemUser) { - try (ThreadContext.StoredContext ctx = threadContext.stashContext()) { - applyInternal(action, request, authenticatedListener); - } - } else { - applyInternal(action, request, authenticatedListener); - } - } else if (SECURITY_ACTION_MATCHER.test(action)) { - throw LicenseUtils.newComplianceException(XPackPlugin.SECURITY); + if (licenseState.isAuthAllowed() == false) { + if (SECURITY_ACTION_MATCHER.test(action)) { + // TODO we should be nice and just call the listener + listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.SECURITY)); } else { chain.proceed(task, action, request, listener); } + return; + } + + // only restore the context if it is not empty. This is needed because sometimes a response is sent to the user + // and then a cleanup action is executed (like for search without a scroll) + final boolean restoreOriginalContext = securityContext.getAuthentication() != null; + final boolean useSystemUser = AuthorizationUtils.shouldReplaceUserWithSystem(threadContext, action); + // we should always restore the original here because we forcefully changed to the system user + final ThreadContext.StoredContext toRestore = restoreOriginalContext || useSystemUser ? threadContext.newStoredContext() : () -> {}; + final ActionListener signingListener = new ContextPreservingActionListener<>(toRestore, ActionListener.wrap(r -> { + try { + listener.onResponse(sign(r)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, listener::onFailure)); + ActionListener authenticatedListener = new ActionListener() { + @Override + public void onResponse(Void aVoid) { + chain.proceed(task, action, request, signingListener); + } + @Override + public void onFailure(Exception e) { + signingListener.onFailure(e); + } + }; + try { + if (useSystemUser) { + securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> { + try { + applyInternal(action, request, authenticatedListener); + } catch (IOException e) { + listener.onFailure(e); + } + }); + } else { + applyInternal(action, request, authenticatedListener); + } } catch (Exception e) { listener.onFailure(e); } @@ -147,8 +153,7 @@ public class SecurityActionFilter extends AbstractComponent implements ActionFil return Integer.MIN_VALUE; } - private void applyInternal(String action, final ActionRequest request, ActionListener listener) - throws IOException { + private void applyInternal(String action, final ActionRequest request, ActionListener listener) throws IOException { /** here we fallback on the system user. Internal system requests are requests that are triggered by the system itself (e.g. pings, update mappings, share relocation, etc...) and were not originated diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index 3896b37612e..170464eaf43 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -67,7 +67,7 @@ public class AuthorizationService extends AbstractComponent { public static final Setting ANONYMOUS_AUTHORIZATION_EXCEPTION_SETTING = Setting.boolSetting(setting("authc.anonymous.authz_exception"), true, Property.NodeScope); public static final String INDICES_PERMISSIONS_KEY = "_indices_permissions"; - static final String ORIGINATING_ACTION_KEY = "_originating_action_name"; + public static final String ORIGINATING_ACTION_KEY = "_originating_action_name"; private static final Predicate MONITOR_INDEX_PREDICATE = IndexPrivilege.MONITOR.predicate(); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java index 9d126f1121d..f4ba6cb9fb8 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java @@ -10,9 +10,9 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.xpack.security.authc.Authentication; import org.elasticsearch.xpack.security.authz.permission.Role; -import org.elasticsearch.xpack.security.user.SystemUser; import org.elasticsearch.xpack.security.support.AutomatonPredicate; import org.elasticsearch.xpack.security.support.Automatons; +import org.elasticsearch.xpack.security.user.SystemUser; import java.util.Collection; import java.util.Collections; @@ -29,11 +29,10 @@ public final class AuthorizationUtils { * This method is used to determine if a request should be executed as the system user, even if the request already * has a user associated with it. * - * In order for the system user to be used, one of the following conditions must be true: + * In order for the user to be replaced by the system user one of the following conditions must be true: * *
    *
  • the action is an internal action and no user is associated with the request
  • - *
  • the action is an internal action and the system user is already associated with the request
  • *
  • the action is an internal action and the thread context contains a non-internal action as the originating action
  • *
* @@ -47,7 +46,7 @@ public final class AuthorizationUtils { } Authentication authentication = threadContext.getTransient(Authentication.AUTHENTICATION_KEY); - if (authentication == null || SystemUser.is(authentication.getUser())) { + if (authentication == null) { return true; } @@ -62,7 +61,7 @@ public final class AuthorizationUtils { return false; } - public static boolean isInternalAction(String action) { + private static boolean isInternalAction(String action) { return INTERNAL_PREDICATE.test(action); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java index 5c390dd8db3..18a4d8476cd 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java @@ -10,6 +10,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.transport.TransportInterceptor; +import org.elasticsearch.xpack.security.SecurityContext; import org.elasticsearch.xpack.security.authc.AuthenticationService; import org.elasticsearch.xpack.security.authz.AuthorizationService; import org.elasticsearch.xpack.security.authz.AuthorizationUtils; @@ -50,16 +51,18 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor private final AuthorizationService authzService; private final SSLService sslService; private final Map profileFilters; - final XPackLicenseState licenseState; + private final XPackLicenseState licenseState; private final ThreadPool threadPool; private final Settings settings; + private final SecurityContext securityContext; public SecurityServerTransportInterceptor(Settings settings, ThreadPool threadPool, AuthenticationService authcService, AuthorizationService authzService, XPackLicenseState licenseState, - SSLService sslService) { + SSLService sslService, + SecurityContext securityContext) { this.settings = settings; this.threadPool = threadPool; this.authcService = authcService; @@ -67,6 +70,7 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor this.licenseState = licenseState; this.sslService = sslService; this.profileFilters = initializeProfileFilters(); + this.securityContext = securityContext; } @Override @@ -75,15 +79,17 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor @Override public void sendRequest(DiscoveryNode node, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler handler) { - // Sometimes a system action gets executed like a internal create index request or update mappings request - // which means that the user is copied over to system actions so we need to change the user - if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) { - try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) { - final ThreadContext.StoredContext original = threadPool.getThreadContext().newStoredContext(); - sendWithUser(node, action, request, options, new ContextRestoreResponseHandler<>(original, handler), sender); + if (licenseState.isAuthAllowed()) { + // Sometimes a system action gets executed like a internal create index request or update mappings request + // which means that the user is copied over to system actions so we need to change the user + if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) { + securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(node, action, request, options, + new ContextRestoreResponseHandler<>(original, handler), sender)); + } else { + sendWithUser(node, action, request, options, handler, sender); } } else { - sendWithUser(node, action, request, options, handler, sender); + sender.sendRequest(node, action, request, options, handler); } } }; @@ -92,11 +98,12 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor private void sendWithUser(DiscoveryNode node, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler handler, AsyncSender sender) { + // There cannot be a request outgoing from this node that is not associated with a user. + if (securityContext.getAuthentication() == null) { + throw new IllegalStateException("there should always be a user when sending a message"); + } + try { - // this will check if there's a user associated with the request. If there isn't, - // the system user will be attached. There cannot be a request outgoing from this - // node that is not associated with a user. - authcService.attachUserIfMissing(SystemUser.INSTANCE); sender.sendRequest(node, action, request, options, handler); } catch (Exception e) { handler.handleException(new TransportException("failed sending request", e)); @@ -248,14 +255,15 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor } /** - * This handler wrapper ensures that the response thread executes with the correct thread context. Before any of the4 handle methods + * This handler wrapper ensures that the response thread executes with the correct thread context. Before any of the handle methods * are invoked we restore the context. */ - private static final class ContextRestoreResponseHandler implements TransportResponseHandler { + static final class ContextRestoreResponseHandler implements TransportResponseHandler { private final TransportResponseHandler delegate; private final ThreadContext.StoredContext threadContext; - private ContextRestoreResponseHandler(ThreadContext.StoredContext threadContext, TransportResponseHandler delegate) { + // pkg private for testing + ContextRestoreResponseHandler(ThreadContext.StoredContext threadContext, TransportResponseHandler delegate) { this.delegate = delegate; this.threadContext = threadContext; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java index 500ce9e5b3c..de8306d0b23 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java @@ -151,7 +151,7 @@ public interface ServerTransportFilter { public void inbound(String action, TransportRequest request, TransportChannel transportChannel, ActionListener listener) throws IOException { // TODO is ']' sufficient to mark as shard action? - boolean isInternalOrShardAction = action.startsWith("internal:") || action.endsWith("]"); + final boolean isInternalOrShardAction = action.startsWith("internal:") || action.endsWith("]"); if (isInternalOrShardAction) { throw authenticationError("executing internal/shard actions is considered malicious and forbidden"); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/SecurityContextTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/SecurityContextTests.java new file mode 100644 index 00000000000..41a462ae2ab --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/SecurityContextTests.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.security; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; +import org.elasticsearch.env.Environment; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.security.authc.Authentication; +import org.elasticsearch.xpack.security.authc.Authentication.RealmRef; +import org.elasticsearch.xpack.security.authc.AuthenticationService; +import org.elasticsearch.xpack.security.crypto.CryptoService; +import org.elasticsearch.xpack.security.user.SystemUser; +import org.elasticsearch.xpack.security.user.User; +import org.junit.Before; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SecurityContextTests extends ESTestCase { + + private boolean signHeader; + private Settings settings; + private ThreadContext threadContext; + private CryptoService cryptoService; + private SecurityContext securityContext; + + @Before + public void buildSecurityContext() throws IOException { + signHeader = randomBoolean(); + settings = Settings.builder() + .put("path.home", createTempDir()) + .put(AuthenticationService.SIGN_USER_HEADER.getKey(), signHeader) + .build(); + threadContext = new ThreadContext(settings); + cryptoService = new CryptoService(settings, new Environment(settings)); + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(threadContext); + securityContext = new SecurityContext(settings, threadPool, cryptoService); + } + + public void testGetAuthenticationAndUserInEmptyContext() throws IOException { + assertNull(securityContext.getAuthentication()); + assertNull(securityContext.getUser()); + } + + public void testGetAuthenticationAndUser() throws IOException { + final User user = new User("test"); + final Authentication authentication = new Authentication(user, new RealmRef("ldap", "foo", "node1"), null); + authentication.writeToContext(threadContext, cryptoService, signHeader); + + assertEquals(authentication, securityContext.getAuthentication()); + assertEquals(user, securityContext.getUser()); + } + + public void testSetUser() { + final User user = new User("test"); + assertNull(securityContext.getAuthentication()); + assertNull(securityContext.getUser()); + securityContext.setUser(user); + assertEquals(user, securityContext.getUser()); + + IllegalStateException e = expectThrows(IllegalStateException.class, + () -> securityContext.setUser(randomFrom(user, SystemUser.INSTANCE))); + assertEquals("authentication is already present in the context", e.getMessage()); + } + + public void testExecuteAsUser() throws IOException { + final User original; + if (randomBoolean()) { + original = new User("test"); + final Authentication authentication = new Authentication(original, new RealmRef("ldap", "foo", "node1"), null); + authentication.writeToContext(threadContext, cryptoService, signHeader); + } else { + original = null; + } + + final User executionUser = new User("executor"); + final AtomicReference contextAtomicReference = new AtomicReference<>(); + securityContext.executeAsUser(executionUser, (originalCtx) -> { + assertEquals(executionUser, securityContext.getUser()); + contextAtomicReference.set(originalCtx); + }); + + final User userAfterExecution = securityContext.getUser(); + assertEquals(original, userAfterExecution); + StoredContext originalContext = contextAtomicReference.get(); + assertNotNull(originalContext); + originalContext.restore(); + assertEquals(original, securityContext.getUser()); + } +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationUtilsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationUtilsTests.java index fdaa0eb50e5..76f580baeef 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationUtilsTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationUtilsTests.java @@ -32,11 +32,13 @@ public class AuthorizationUtilsTests extends ESTestCase { assertThat(AuthorizationUtils.shouldReplaceUserWithSystem(threadContext, randomFrom("indices:foo", "cluster:bar")), is(false)); } - public void testSystemUserSwitchWithNullorSystemUser() { - if (randomBoolean()) { - threadContext.putTransient(Authentication.AUTHENTICATION_KEY, - new Authentication(SystemUser.INSTANCE, new RealmRef("test", "test", "foo"), null)); - } + public void testSystemUserSwitchWithSystemUser() { + threadContext.putTransient(Authentication.AUTHENTICATION_KEY, + new Authentication(SystemUser.INSTANCE, new RealmRef("test", "test", "foo"), null)); + assertThat(AuthorizationUtils.shouldReplaceUserWithSystem(threadContext, "internal:something"), is(false)); + } + + public void testSystemUserSwitchWithNullUser() { assertThat(AuthorizationUtils.shouldReplaceUserWithSystem(threadContext, "internal:something"), is(true)); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java new file mode 100644 index 00000000000..6a1e9c23214 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java @@ -0,0 +1,211 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.security.transport; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.env.Environment; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportInterceptor.AsyncSender; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponse.Empty; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.xpack.security.SecurityContext; +import org.elasticsearch.xpack.security.authc.Authentication; +import org.elasticsearch.xpack.security.authc.Authentication.RealmRef; +import org.elasticsearch.xpack.security.authc.AuthenticationService; +import org.elasticsearch.xpack.security.authz.AuthorizationService; +import org.elasticsearch.xpack.security.crypto.CryptoService; +import org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor.ContextRestoreResponseHandler; +import org.elasticsearch.xpack.security.user.SystemUser; +import org.elasticsearch.xpack.security.user.User; +import org.elasticsearch.xpack.ssl.SSLService; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class SecurityServerTransportInterceptorTests extends ESTestCase { + + private Settings settings; + private ThreadPool threadPool; + private ThreadContext threadContext; + private XPackLicenseState xPackLicenseState; + private CryptoService cryptoService; + private SecurityContext securityContext; + + @Override + public void setUp() throws Exception { + super.setUp(); + settings = Settings.builder().put("path.home", createTempDir()).build(); + threadPool = mock(ThreadPool.class); + threadContext = new ThreadContext(settings); + when(threadPool.getThreadContext()).thenReturn(threadContext); + cryptoService = new CryptoService(settings, new Environment(settings)); + securityContext = spy(new SecurityContext(settings, threadPool, cryptoService)); + xPackLicenseState = mock(XPackLicenseState.class); + when(xPackLicenseState.isAuthAllowed()).thenReturn(true); + } + + public void testSendAsyncUnlicensed() { + SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool, + mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class), + securityContext); + when(xPackLicenseState.isAuthAllowed()).thenReturn(false); + AtomicBoolean calledWrappedSender = new AtomicBoolean(false); + AsyncSender sender = interceptor.interceptSender(new AsyncSender() { + @Override + public void sendRequest(DiscoveryNode node, String action, TransportRequest request, + TransportRequestOptions options, TransportResponseHandler handler) { + if (calledWrappedSender.compareAndSet(false, true) == false) { + fail("sender called more than once!"); + } + } + }); + sender.sendRequest(null, null, null, null, null); + assertTrue(calledWrappedSender.get()); + verify(xPackLicenseState).isAuthAllowed(); + verifyNoMoreInteractions(xPackLicenseState); + verifyZeroInteractions(securityContext); + } + + public void testSendAsync() throws Exception { + final User user = new User("test"); + final Authentication authentication = new Authentication(user, new RealmRef("ldap", "foo", "node1"), null); + authentication.writeToContext(threadContext, cryptoService, AuthenticationService.SIGN_USER_HEADER.get(settings)); + SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool, + mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class), + securityContext); + + AtomicBoolean calledWrappedSender = new AtomicBoolean(false); + AtomicReference sendingUser = new AtomicReference<>(); + AsyncSender sender = interceptor.interceptSender(new AsyncSender() { + @Override + public void sendRequest(DiscoveryNode node, String action, TransportRequest request, + TransportRequestOptions options, TransportResponseHandler handler) { + if (calledWrappedSender.compareAndSet(false, true) == false) { + fail("sender called more than once!"); + } + sendingUser.set(securityContext.getUser()); + } + }); + sender.sendRequest(null, "indices:foo", null, null, null); + assertTrue(calledWrappedSender.get()); + assertEquals(user, sendingUser.get()); + assertEquals(user, securityContext.getUser()); + verify(xPackLicenseState).isAuthAllowed(); + verify(securityContext, never()).executeAsUser(any(User.class), any(Consumer.class)); + verifyNoMoreInteractions(xPackLicenseState); + } + + public void testSendAsyncSwitchToSystem() throws Exception { + final User user = new User("test"); + final Authentication authentication = new Authentication(user, new RealmRef("ldap", "foo", "node1"), null); + authentication.writeToContext(threadContext, cryptoService, AuthenticationService.SIGN_USER_HEADER.get(settings)); + threadContext.putTransient(AuthorizationService.ORIGINATING_ACTION_KEY, "indices:foo"); + + SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool, + mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class), + securityContext); + + AtomicBoolean calledWrappedSender = new AtomicBoolean(false); + AtomicReference sendingUser = new AtomicReference<>(); + AsyncSender sender = interceptor.interceptSender(new AsyncSender() { + @Override + public void sendRequest(DiscoveryNode node, String action, TransportRequest request, + TransportRequestOptions options, TransportResponseHandler handler) { + if (calledWrappedSender.compareAndSet(false, true) == false) { + fail("sender called more than once!"); + } + sendingUser.set(securityContext.getUser()); + } + }); + sender.sendRequest(null, "internal:foo", null, null, null); + assertTrue(calledWrappedSender.get()); + assertNotEquals(user, sendingUser.get()); + assertEquals(SystemUser.INSTANCE, sendingUser.get()); + assertEquals(user, securityContext.getUser()); + verify(xPackLicenseState).isAuthAllowed(); + verify(securityContext).executeAsUser(any(User.class), any(Consumer.class)); + verifyNoMoreInteractions(xPackLicenseState); + } + + public void testSendWithoutUser() throws Exception { + SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool, + mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class), + securityContext); + + assertNull(securityContext.getUser()); + AsyncSender sender = interceptor.interceptSender(new AsyncSender() { + @Override + public void sendRequest(DiscoveryNode node, String action, TransportRequest request, + TransportRequestOptions options, TransportResponseHandler handler) { + fail("sender should not be called!"); + } + }); + IllegalStateException e = + expectThrows(IllegalStateException.class, () -> sender.sendRequest(null, "indices:foo", null, null, null)); + assertEquals("there should always be a user when sending a message", e.getMessage()); + assertNull(securityContext.getUser()); + verify(xPackLicenseState).isAuthAllowed(); + verify(securityContext, never()).executeAsUser(any(User.class), any(Consumer.class)); + verifyNoMoreInteractions(xPackLicenseState); + } + + public void testContextRestoreResponseHandler() throws Exception { + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + + threadContext.putTransient("foo", "bar"); + threadContext.putHeader("key", "value"); + try (ThreadContext.StoredContext storedContext = threadContext.stashContext()) { + threadContext.putTransient("foo", "different_bar"); + threadContext.putHeader("key", "value2"); + TransportResponseHandler handler = new ContextRestoreResponseHandler<>(storedContext, + new TransportResponseHandler() { + + @Override + public Empty newInstance() { + return Empty.INSTANCE; + } + + @Override + public void handleResponse(Empty response) { + assertEquals("bar", threadContext.getTransient("foo")); + assertEquals("value", threadContext.getHeader("key")); + } + + @Override + public void handleException(TransportException exp) { + assertEquals("bar", threadContext.getTransient("foo")); + assertEquals("value", threadContext.getHeader("key")); + } + + @Override + public String executor() { + return null; + } + }); + + handler.handleResponse(null); + handler.handleException(null); + } + } +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java index 27695c4d49b..80e496da697 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java @@ -12,12 +12,14 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.xpack.security.authc.Authentication; -import org.elasticsearch.xpack.security.action.SecurityActionMapper; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportSettings; +import org.elasticsearch.xpack.security.authc.Authentication.RealmRef; import org.elasticsearch.xpack.security.authc.AuthenticationService; import org.elasticsearch.xpack.security.authz.AuthorizationService; +import org.elasticsearch.xpack.security.authz.permission.Role; +import org.elasticsearch.xpack.security.authz.permission.SuperuserRole; import org.elasticsearch.xpack.security.user.SystemUser; import org.elasticsearch.xpack.security.user.User; import org.elasticsearch.xpack.security.user.XPackUser; @@ -26,21 +28,23 @@ import org.junit.Before; import java.util.Collection; import java.util.Collections; +import static org.elasticsearch.mock.orig.Mockito.times; import static org.elasticsearch.xpack.security.support.Exceptions.authenticationError; import static org.elasticsearch.xpack.security.support.Exceptions.authorizationError; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; public class ServerTransportFilterTests extends ESTestCase { private AuthenticationService authcService; private AuthorizationService authzService; - private ServerTransportFilter filter; private TransportChannel channel; @Before @@ -49,8 +53,6 @@ public class ServerTransportFilterTests extends ESTestCase { authzService = mock(AuthorizationService.class); channel = mock(TransportChannel.class); when(channel.getProfileName()).thenReturn(TransportSettings.DEFAULT_PROFILE); - filter = new ServerTransportFilter.NodeProfile(authcService, authzService, - new ThreadContext(Settings.EMPTY), false); } public void testInbound() throws Exception { @@ -58,6 +60,7 @@ public class ServerTransportFilterTests extends ESTestCase { Authentication authentication = mock(Authentication.class); when(authentication.getUser()).thenReturn(SystemUser.INSTANCE); when(authcService.authenticate("_action", request, null)).thenReturn(authentication); + ServerTransportFilter filter = getClientOrNodeFilter(); PlainActionFuture future = new PlainActionFuture(); filter.inbound("_action", request, channel, future); //future.get(); // don't block it's not called really just mocked @@ -67,6 +70,7 @@ public class ServerTransportFilterTests extends ESTestCase { public void testInboundAuthenticationException() throws Exception { TransportRequest request = mock(TransportRequest.class); doThrow(authenticationError("authc failed")).when(authcService).authenticate("_action", request, null); + ServerTransportFilter filter = getClientOrNodeFilter(); try { PlainActionFuture future = new PlainActionFuture(); filter.inbound("_action", request, channel, future); @@ -79,6 +83,7 @@ public class ServerTransportFilterTests extends ESTestCase { } public void testInboundAuthorizationException() throws Exception { + ServerTransportFilter filter = getClientOrNodeFilter(); TransportRequest request = mock(TransportRequest.class); Authentication authentication = mock(Authentication.class); when(authcService.authenticate("_action", request, null)).thenReturn(authentication); @@ -101,4 +106,57 @@ public class ServerTransportFilterTests extends ESTestCase { assertThat(e.getMessage(), equalTo("authz failed")); } } + + public void testClientProfileRejectsNodeActions() throws Exception { + TransportRequest request = mock(TransportRequest.class); + ServerTransportFilter filter = getClientFilter(); + ElasticsearchSecurityException e = expectThrows(ElasticsearchSecurityException.class, + () -> filter.inbound("internal:foo/bar", request, channel, new PlainActionFuture<>())); + assertEquals("executing internal/shard actions is considered malicious and forbidden", e.getMessage()); + e = expectThrows(ElasticsearchSecurityException.class, + () -> filter.inbound("indices:action" + randomFrom("[s]", "[p]", "[r]", "[n]", "[s][p]", "[s][r]", "[f]"), + request, channel, new PlainActionFuture<>())); + assertEquals("executing internal/shard actions is considered malicious and forbidden", e.getMessage()); + verifyZeroInteractions(authcService); + } + + public void testNodeProfileAllowsNodeActions() throws Exception { + final String internalAction = "internal:foo/bar"; + final String nodeOrShardAction = "indices:action" + randomFrom("[s]", "[p]", "[r]", "[n]", "[s][p]", "[s][r]", "[f]"); + ServerTransportFilter filter = getNodeFilter(); + TransportRequest request = mock(TransportRequest.class); + Authentication authentication = new Authentication(new User("test", "superuser"), new RealmRef("test", "test", "node1"), null); + final Collection userRoles = Collections.singletonList(SuperuserRole.INSTANCE); + doAnswer((i) -> { + ActionListener callback = + (ActionListener) i.getArguments()[1]; + callback.onResponse(authentication.getUser().equals(i.getArguments()[0]) ? userRoles : Collections.emptyList()); + return Void.TYPE; + }).when(authzService).roles(any(User.class), any(ActionListener.class)); + when(authcService.authenticate(internalAction, request, null)).thenReturn(authentication); + when(authcService.authenticate(nodeOrShardAction, request, null)).thenReturn(authentication); + + filter.inbound(internalAction, request, channel, new PlainActionFuture<>()); + verify(authcService).authenticate(internalAction, request, null); + verify(authzService).roles(eq(authentication.getUser()), any(ActionListener.class)); + verify(authzService).authorize(authentication, internalAction, request, userRoles, Collections.emptyList()); + + filter.inbound(nodeOrShardAction, request, channel, new PlainActionFuture<>()); + verify(authcService).authenticate(nodeOrShardAction, request, null); + verify(authzService, times(2)).roles(eq(authentication.getUser()), any(ActionListener.class)); + verify(authzService).authorize(authentication, nodeOrShardAction, request, userRoles, Collections.emptyList()); + verifyNoMoreInteractions(authcService, authzService); + } + + private ServerTransportFilter getClientOrNodeFilter() { + return randomBoolean() ? getNodeFilter() : getClientFilter(); + } + + private ServerTransportFilter.ClientProfile getClientFilter() { + return new ServerTransportFilter.ClientProfile(authcService, authzService, new ThreadContext(Settings.EMPTY), false); + } + + private ServerTransportFilter.NodeProfile getNodeFilter() { + return new ServerTransportFilter.NodeProfile(authcService, authzService, new ThreadContext(Settings.EMPTY), false); + } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/TransportFilterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/TransportFilterTests.java deleted file mode 100644 index 44014632f97..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/security/transport/TransportFilterTests.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.security.transport; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.plugins.NetworkPlugin; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.search.SearchRequestParsers; -import org.elasticsearch.transport.MockTcpTransportPlugin; -import org.elasticsearch.transport.TransportInterceptor; -import org.elasticsearch.watcher.ResourceWatcherService; -import org.elasticsearch.xpack.security.authc.AuthenticationService; -import org.elasticsearch.xpack.security.authz.AuthorizationService; -import org.elasticsearch.license.XPackLicenseState; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.ESIntegTestCase.ClusterScope; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.transport.TransportSettings; -import org.elasticsearch.xpack.security.user.SystemUser; -import org.elasticsearch.xpack.ssl.SSLService; -import org.mockito.InOrder; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; -import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -@ClusterScope(scope = SUITE, numDataNodes = 0) -public class TransportFilterTests extends ESIntegTestCase { - - @Override - protected Collection> getMockPlugins() { - return Collections.emptyList(); - } - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(InternalPluginServerTransportServiceInterceptor.TestPlugin.class, MockTcpTransportPlugin.class); - } - - @Override - protected Collection> transportClientPlugins() { - return Collections.singleton(MockTcpTransportPlugin.class); - } - - public void test() throws Exception { - String source = internalCluster().startNode(); - DiscoveryNode sourceNode = internalCluster().getInstance(ClusterService.class, source).localNode(); - TransportService sourceService = internalCluster().getInstance(TransportService.class, source); - - InternalPluginServerTransportServiceInterceptor sourceInterceptor = internalCluster().getInstance(PluginsService.class, source) - .filterPlugins(InternalPluginServerTransportServiceInterceptor.TestPlugin.class).stream().findFirst().get().interceptor; - - String target = internalCluster().startNode(); - DiscoveryNode targetNode = internalCluster().getInstance(ClusterService.class, target).localNode(); - TransportService targetService = internalCluster().getInstance(TransportService.class, target); - - InternalPluginServerTransportServiceInterceptor targetInterceptor = internalCluster().getInstance(PluginsService.class, target) - .filterPlugins(InternalPluginServerTransportServiceInterceptor.TestPlugin.class).stream().findFirst().get().interceptor; - - CountDownLatch latch = new CountDownLatch(2); - targetService.registerRequestHandler("_action", Request::new, ThreadPool.Names.SAME, - new RequestHandler(new Response("trgt_to_src"), latch)); - sourceService.sendRequest(targetNode, "_action", new Request("src_to_trgt"), - new ResponseHandler(new Response("trgt_to_src"), latch)); - await(latch); - - latch = new CountDownLatch(2); - sourceService.registerRequestHandler("_action", Request::new, ThreadPool.Names.SAME, - new RequestHandler(new Response("src_to_trgt"), latch)); - targetService.sendRequest(sourceNode, "_action", new Request("trgt_to_src"), - new ResponseHandler(new Response("src_to_trgt"), latch)); - await(latch); - - ServerTransportFilter sourceServerFilter = sourceInterceptor.transportFilter(TransportSettings.DEFAULT_PROFILE); - ServerTransportFilter targetServerFilter = targetInterceptor.transportFilter(TransportSettings.DEFAULT_PROFILE); - - AuthenticationService sourceAuth = internalCluster().getInstance(AuthenticationService.class, source); - AuthenticationService targetAuth = internalCluster().getInstance(AuthenticationService.class, target); - - InOrder inOrder = inOrder(sourceAuth, targetServerFilter, targetAuth, sourceServerFilter); - inOrder.verify(sourceAuth).attachUserIfMissing(SystemUser.INSTANCE); - inOrder.verify(targetServerFilter).inbound(eq("_action"), eq(new Request("src_to_trgt")), isA(TransportChannel.class), - any(ActionListener.class)); - inOrder.verify(targetAuth).attachUserIfMissing(SystemUser.INSTANCE); - inOrder.verify(sourceServerFilter).inbound(eq("_action"), eq(new Request("trgt_to_src")), isA(TransportChannel.class), - any(ActionListener.class)); - } - - public static class InternalPlugin extends Plugin { - @Override - public Collection createGuiceModules() { - return Collections.singletonList(new TestTransportFilterModule()); - } - } - - public static class TestTransportFilterModule extends AbstractModule { - @Override - protected void configure() { - bind(AuthenticationService.class).toInstance(mock(AuthenticationService.class)); - bind(AuthorizationService.class).toInstance(mock(AuthorizationService.class)); - } - } - - public static class Request extends TransportRequest { - private String msg; - - public Request() { - } - - Request(String msg) { - this.msg = msg; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - msg = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(msg); - } - - @Override - public String toString() { - return msg; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Request request = (Request) o; - - if (!msg.equals(request.msg)) return false; - - return true; - } - - @Override - public int hashCode() { - return msg.hashCode(); - } - } - - static class Response extends TransportResponse { - - private String msg; - - Response() { - } - - Response(String msg) { - this.msg = msg; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - msg = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(msg); - } - - @Override - public String toString() { - return msg; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Response response = (Response) o; - - if (!msg.equals(response.msg)) return false; - - return true; - } - - @Override - public int hashCode() { - return msg.hashCode(); - } - } - - static class RequestHandler implements TransportRequestHandler { - private final Response response; - private final CountDownLatch latch; - - RequestHandler(Response response, CountDownLatch latch) { - this.response = response; - this.latch = latch; - } - - @Override - public void messageReceived(Request request, TransportChannel channel) throws Exception { - channel.sendResponse(response); - latch.countDown(); - } - } - - class ResponseHandler implements TransportResponseHandler { - private final Response response; - private final CountDownLatch latch; - - ResponseHandler(Response response, CountDownLatch latch) { - this.response = response; - this.latch = latch; - } - - @Override - public Response newInstance() { - return new Response(); - } - - @Override - public void handleResponse(Response response) { - assertThat(response, equalTo(this.response)); - latch.countDown(); - } - - @Override - public void handleException(TransportException exp) { - logger.error("execution of request failed", exp); - fail("execution of request failed"); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - } - - private static void await(CountDownLatch latch) throws Exception { - if (!latch.await(5, TimeUnit.SECONDS)) { - fail("waiting too long for request"); - } - } - - // Sub class the security transport to always inject a mock for testing - public static class InternalPluginServerTransportServiceInterceptor extends SecurityServerTransportInterceptor { - public static class TestPlugin extends Plugin implements NetworkPlugin { - AuthenticationService authenticationService = mock(AuthenticationService.class); - AuthorizationService authorizationService = mock(AuthorizationService.class); - InternalPluginServerTransportServiceInterceptor interceptor; - @Override - public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, - ResourceWatcherService resourceWatcherService, ScriptService scriptService, - SearchRequestParsers searchRequestParsers) { - - interceptor = new InternalPluginServerTransportServiceInterceptor(clusterService.getSettings(), threadPool, - authenticationService, authorizationService); - return Collections.emptyList(); - } - - @Override - public Collection createGuiceModules() { - return Collections.singleton((Module) binder -> { - binder.bind(AuthenticationService.class).toInstance(authenticationService); - binder.bind(AuthorizationService.class).toInstance(authorizationService); - }); - } - - @Override - public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) { - return Collections.singletonList(new TransportInterceptor() { - @Override - public TransportRequestHandler interceptHandler(String action, String executor, - TransportRequestHandler actualHandler) { - return interceptor.interceptHandler(action, executor, actualHandler); - } - - @Override - public AsyncSender interceptSender(AsyncSender sender) { - return interceptor.interceptSender(sender); - } - }); - } - } - - public InternalPluginServerTransportServiceInterceptor(Settings settings, ThreadPool threadPool, - AuthenticationService authenticationService, - AuthorizationService authorizationService) { - super(settings, threadPool,authenticationService, authorizationService, mock(XPackLicenseState.class), - mock(SSLService.class)); - when(licenseState.isAuthAllowed()).thenReturn(true); - doAnswer((i) -> { - ActionListener callback = - (ActionListener) i.getArguments()[3]; - callback.onResponse(null); - return Void.TYPE; - }).when(authorizationService).roles(any(), any(ActionListener.class)); - } - - @Override - protected Map initializeProfileFilters() { - ServerTransportFilter.NodeProfile mock = mock(ServerTransportFilter.NodeProfile.class); - try { - doAnswer((i) -> { - ActionListener callback = - (ActionListener) i.getArguments()[3]; - callback.onResponse(null); - return Void.TYPE; - }).when(mock).inbound(any(), any(), any(), any(ActionListener.class)); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - return Collections.singletonMap(TransportSettings.DEFAULT_PROFILE, - mock); - } - } -}