diff --git a/server/src/main/java/org/elasticsearch/common/cache/Cache.java b/server/src/main/java/org/elasticsearch/common/cache/Cache.java index 0db4d718709..beb2819f2e6 100644 --- a/server/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/server/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -68,6 +68,7 @@ import java.util.function.ToLongBiFunction; * @param The type of the values */ public class Cache { + // positive if entries have an expiration private long expireAfterAccessNanos = -1; @@ -282,6 +283,39 @@ public class Cache { } } + /** + * remove an entry from the segment iff the future is done and the value is equal to the + * expected value + * + * @param key the key of the entry to remove from the cache + * @param value the value expected to be associated with the key + * @param onRemoval a callback for the removed entry + */ + void remove(K key, V value, Consumer>> onRemoval) { + CompletableFuture> future; + boolean removed = false; + try (ReleasableLock ignored = writeLock.acquire()) { + future = map.get(key); + try { + if (future != null) { + if (future.isDone()) { + Entry entry = future.get(); + if (Objects.equals(value, entry.value)) { + removed = map.remove(key, future); + } + } + } + } catch (ExecutionException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + + if (future != null && removed) { + segmentStats.eviction(); + onRemoval.accept(future); + } + } + private static class SegmentStats { private final LongAdder hits = new LongAdder(); private final LongAdder misses = new LongAdder(); @@ -314,7 +348,7 @@ public class Cache { Entry tail; // lock protecting mutations to the LRU list - private ReleasableLock lruLock = new ReleasableLock(new ReentrantLock()); + private final ReleasableLock lruLock = new ReleasableLock(new ReentrantLock()); /** * Returns the value to which the specified key is mapped, or null if this map contains no mapping for the key. @@ -455,6 +489,19 @@ public class Cache { } } + private final Consumer>> invalidationConsumer = f -> { + try { + Entry entry = f.get(); + try (ReleasableLock ignored = lruLock.acquire()) { + delete(entry, RemovalNotification.RemovalReason.INVALIDATED); + } + } catch (ExecutionException e) { + // ok + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + }; + /** * Invalidate the association for the specified key. A removal notification will be issued for invalidated * entries with {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED. @@ -463,18 +510,20 @@ public class Cache { */ public void invalidate(K key) { CacheSegment segment = getCacheSegment(key); - segment.remove(key, f -> { - try { - Entry entry = f.get(); - try (ReleasableLock ignored = lruLock.acquire()) { - delete(entry, RemovalNotification.RemovalReason.INVALIDATED); - } - } catch (ExecutionException e) { - // ok - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - }); + segment.remove(key, invalidationConsumer); + } + + /** + * Invalidate the entry for the specified key and value. If the value provided is not equal to the value in + * the cache, no removal will occur. A removal notification will be issued for invalidated + * entries with {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED. + * + * @param key the key whose mapping is to be invalidated from the cache + * @param value the expected value that should be associated with the key + */ + public void invalidate(K key, V value) { + CacheSegment segment = getCacheSegment(key); + segment.remove(key, value, invalidationConsumer); } /** @@ -625,7 +674,7 @@ public class Cache { Entry entry = current; if (entry != null) { CacheSegment segment = getCacheSegment(entry.key); - segment.remove(entry.key, f -> {}); + segment.remove(entry.key, entry.value, f -> {}); try (ReleasableLock ignored = lruLock.acquire()) { current = null; delete(entry, RemovalNotification.RemovalReason.INVALIDATED); @@ -710,7 +759,7 @@ public class Cache { CacheSegment segment = getCacheSegment(entry.key); if (segment != null) { - segment.remove(entry.key, f -> {}); + segment.remove(entry.key, entry.value, f -> {}); } delete(entry, RemovalNotification.RemovalReason.EVICTED); } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java new file mode 100644 index 00000000000..d50f57aaafa --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java @@ -0,0 +1,115 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.collect.Tuple; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A future implementation that allows for the result to be passed to listeners waiting for + * notification. This is useful for cases where a computation is requested many times + * concurrently, but really only needs to be performed a single time. Once the computation + * has been performed the registered listeners will be notified by submitting a runnable + * for execution in the provided {@link ExecutorService}. If the computation has already + * been performed, a request to add a listener will simply result in execution of the listener + * on the calling thread. + */ +public final class ListenableFuture extends BaseFuture implements ActionListener { + + private volatile boolean done = false; + private final List, ExecutorService>> listeners = new ArrayList<>(); + + /** + * Adds a listener to this future. If the future has not yet completed, the listener will be + * notified of a response or exception in a runnable submitted to the ExecutorService provided. + * If the future has completed, the listener will be notified immediately without forking to + * a different thread. + */ + public void addListener(ActionListener listener, ExecutorService executor) { + if (done) { + // run the callback directly, we don't hold the lock and don't need to fork! + notifyListener(listener, EsExecutors.newDirectExecutorService()); + } else { + final boolean run; + // check done under lock since it could have been modified and protect modifications + // to the list under lock + synchronized (this) { + if (done) { + run = true; + } else { + listeners.add(new Tuple<>(listener, executor)); + run = false; + } + } + + if (run) { + // run the callback directly, we don't hold the lock and don't need to fork! + notifyListener(listener, EsExecutors.newDirectExecutorService()); + } + } + } + + @Override + protected synchronized void done() { + done = true; + listeners.forEach(t -> notifyListener(t.v1(), t.v2())); + // release references to any listeners as we no longer need them and will live + // much longer than the listeners in most cases + listeners.clear(); + } + + private void notifyListener(ActionListener listener, ExecutorService executorService) { + try { + executorService.submit(() -> { + try { + // call get in a non-blocking fashion as we could be on a network thread + // or another thread like the scheduler, which we should never block! + V value = FutureUtils.get(this, 0L, TimeUnit.NANOSECONDS); + listener.onResponse(value); + } catch (Exception e) { + listener.onFailure(e); + } + }); + } catch (Exception e) { + listener.onFailure(e); + } + } + + @Override + public void onResponse(V v) { + final boolean set = set(v); + if (set == false) { + throw new IllegalStateException("did not set value, value or exception already set?"); + } + } + + @Override + public void onFailure(Exception e) { + final boolean set = setException(e); + if (set == false) { + throw new IllegalStateException("did not set exception, value already set or exception already set?"); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/common/cache/CacheTests.java b/server/src/test/java/org/elasticsearch/common/cache/CacheTests.java index fe64fd16af6..3b183cce40b 100644 --- a/server/src/test/java/org/elasticsearch/common/cache/CacheTests.java +++ b/server/src/test/java/org/elasticsearch/common/cache/CacheTests.java @@ -457,6 +457,62 @@ public class CacheTests extends ESTestCase { assertEquals(notifications, invalidated); } + // randomly invalidate some cached entries, then check that a lookup for each of those and only those keys is null + public void testInvalidateWithValue() { + Cache cache = CacheBuilder.builder().build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + Set keys = new HashSet<>(); + for (Integer key : cache.keys()) { + if (rarely()) { + if (randomBoolean()) { + cache.invalidate(key, key.toString()); + keys.add(key); + } else { + // invalidate with incorrect value + cache.invalidate(key, Integer.toString(key * randomIntBetween(2, 10))); + } + } + } + for (int i = 0; i < numberOfEntries; i++) { + if (keys.contains(i)) { + assertNull(cache.get(i)); + } else { + assertNotNull(cache.get(i)); + } + } + } + + // randomly invalidate some cached entries, then check that we receive invalidate notifications for those and only + // those entries + public void testNotificationOnInvalidateWithValue() { + Set notifications = new HashSet<>(); + Cache cache = + CacheBuilder.builder() + .removalListener(notification -> { + assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); + notifications.add(notification.getKey()); + }) + .build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + Set invalidated = new HashSet<>(); + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + if (randomBoolean()) { + cache.invalidate(i, Integer.toString(i)); + invalidated.add(i); + } else { + // invalidate with incorrect value + cache.invalidate(i, Integer.toString(i * randomIntBetween(2, 10))); + } + } + } + assertEquals(notifications, invalidated); + } + // invalidate all cached entries, then check that the cache is empty public void testInvalidateAll() { Cache cache = CacheBuilder.builder().build(); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java new file mode 100644 index 00000000000..712656777f9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.junit.After; + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +public class ListenableFutureTests extends ESTestCase { + + private ExecutorService executorService; + + @After + public void stopExecutorService() throws InterruptedException { + if (executorService != null) { + terminate(executorService); + } + } + + public void testListenableFutureNotifiesListeners() { + ListenableFuture future = new ListenableFuture<>(); + AtomicInteger notifications = new AtomicInteger(0); + final int numberOfListeners = scaledRandomIntBetween(1, 12); + for (int i = 0; i < numberOfListeners; i++) { + future.addListener(ActionListener.wrap(notifications::incrementAndGet), EsExecutors.newDirectExecutorService()); + } + + future.onResponse(""); + assertEquals(numberOfListeners, notifications.get()); + assertTrue(future.isDone()); + } + + public void testListenableFutureNotifiesListenersOnException() { + ListenableFuture future = new ListenableFuture<>(); + AtomicInteger notifications = new AtomicInteger(0); + final int numberOfListeners = scaledRandomIntBetween(1, 12); + final Exception exception = new RuntimeException(); + for (int i = 0; i < numberOfListeners; i++) { + future.addListener(ActionListener.wrap(s -> fail("this should never be called"), e -> { + assertEquals(exception, e); + notifications.incrementAndGet(); + }), EsExecutors.newDirectExecutorService()); + } + + future.onFailure(exception); + assertEquals(numberOfListeners, notifications.get()); + assertTrue(future.isDone()); + } + + public void testConcurrentListenerRegistrationAndCompletion() throws BrokenBarrierException, InterruptedException { + final int numberOfThreads = scaledRandomIntBetween(2, 32); + final int completingThread = randomIntBetween(0, numberOfThreads - 1); + final ListenableFuture future = new ListenableFuture<>(); + executorService = EsExecutors.newFixed("testConcurrentListenerRegistrationAndCompletion", numberOfThreads, 1000, + EsExecutors.daemonThreadFactory("listener"), new ThreadContext(Settings.EMPTY)); + final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); + final CountDownLatch listenersLatch = new CountDownLatch(numberOfThreads - 1); + final AtomicInteger numResponses = new AtomicInteger(0); + final AtomicInteger numExceptions = new AtomicInteger(0); + + for (int i = 0; i < numberOfThreads; i++) { + final int threadNum = i; + Thread thread = new Thread(() -> { + try { + barrier.await(); + if (threadNum == completingThread) { + future.onResponse(""); + } else { + future.addListener(ActionListener.wrap(s -> { + assertEquals("", s); + numResponses.incrementAndGet(); + listenersLatch.countDown(); + }, e -> { + logger.error("caught unexpected exception", e); + numExceptions.incrementAndGet(); + listenersLatch.countDown(); + }), executorService); + } + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new AssertionError(e); + } + }); + thread.start(); + } + + barrier.await(); + barrier.await(); + listenersLatch.await(); + + assertEquals(numberOfThreads - 1, numResponses.get()); + assertEquals(0, numExceptions.get()); + } +} diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 133093df33a..6d12b6472f1 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -410,7 +410,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw final NativeRoleMappingStore nativeRoleMappingStore = new NativeRoleMappingStore(settings, client, securityIndex.get()); final AnonymousUser anonymousUser = new AnonymousUser(settings); final ReservedRealm reservedRealm = new ReservedRealm(env, settings, nativeUsersStore, - anonymousUser, securityIndex.get(), threadPool.getThreadContext()); + anonymousUser, securityIndex.get(), threadPool); Map realmFactories = new HashMap<>(InternalRealms.getFactories(threadPool, resourceWatcherService, getSslService(), nativeUsersStore, nativeRoleMappingStore, securityIndex.get())); for (SecurityExtension extension : securityExtensions) { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/InternalRealms.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/InternalRealms.java index 1e38e6fd103..d8d0d26f99e 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/InternalRealms.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/InternalRealms.java @@ -93,9 +93,9 @@ public final class InternalRealms { SecurityIndexManager securityIndex) { Map map = new HashMap<>(); - map.put(FileRealmSettings.TYPE, config -> new FileRealm(config, resourceWatcherService)); + map.put(FileRealmSettings.TYPE, config -> new FileRealm(config, resourceWatcherService, threadPool)); map.put(NativeRealmSettings.TYPE, config -> { - final NativeRealm nativeRealm = new NativeRealm(config, nativeUsersStore); + final NativeRealm nativeRealm = new NativeRealm(config, nativeUsersStore, threadPool); securityIndex.addIndexStateListener(nativeRealm::onSecurityIndexStateChange); return nativeRealm; }); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealm.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealm.java index c9ccdbb75c0..af2bfcf0d6c 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealm.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealm.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.security.authc.esnative; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.security.authc.AuthenticationResult; import org.elasticsearch.xpack.core.security.authc.RealmConfig; import org.elasticsearch.xpack.core.security.authc.esnative.NativeRealmSettings; @@ -24,8 +25,8 @@ public class NativeRealm extends CachingUsernamePasswordRealm { private final NativeUsersStore userStore; - public NativeRealm(RealmConfig config, NativeUsersStore usersStore) { - super(NativeRealmSettings.TYPE, config); + public NativeRealm(RealmConfig config, NativeUsersStore usersStore, ThreadPool threadPool) { + super(NativeRealmSettings.TYPE, config, threadPool); this.userStore = usersStore; } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/ReservedRealm.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/ReservedRealm.java index 7dbcea90872..3946a01784b 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/ReservedRealm.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/ReservedRealm.java @@ -14,8 +14,8 @@ import org.elasticsearch.common.settings.SecureSetting; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.security.SecurityField; import org.elasticsearch.xpack.core.security.authc.AuthenticationResult; @@ -66,8 +66,8 @@ public class ReservedRealm extends CachingUsernamePasswordRealm { private final SecurityIndexManager securityIndex; public ReservedRealm(Environment env, Settings settings, NativeUsersStore nativeUsersStore, AnonymousUser anonymousUser, - SecurityIndexManager securityIndex, ThreadContext threadContext) { - super(TYPE, new RealmConfig(TYPE, Settings.EMPTY, settings, env, threadContext)); + SecurityIndexManager securityIndex, ThreadPool threadPool) { + super(TYPE, new RealmConfig(TYPE, Settings.EMPTY, settings, env, threadPool.getThreadContext()), threadPool); this.nativeUsersStore = nativeUsersStore; this.realmEnabled = XPackSettings.RESERVED_REALM_ENABLED_SETTING.get(settings); this.anonymousUser = anonymousUser; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/file/FileRealm.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/file/FileRealm.java index 9e85b450521..88656b9e01e 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/file/FileRealm.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/file/FileRealm.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.security.authc.file; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.security.authc.AuthenticationResult; import org.elasticsearch.xpack.core.security.authc.RealmConfig; @@ -21,13 +22,13 @@ public class FileRealm extends CachingUsernamePasswordRealm { private final FileUserPasswdStore userPasswdStore; private final FileUserRolesStore userRolesStore; - public FileRealm(RealmConfig config, ResourceWatcherService watcherService) { - this(config, new FileUserPasswdStore(config, watcherService), new FileUserRolesStore(config, watcherService)); + public FileRealm(RealmConfig config, ResourceWatcherService watcherService, ThreadPool threadPool) { + this(config, new FileUserPasswdStore(config, watcherService), new FileUserRolesStore(config, watcherService), threadPool); } // pkg private for testing - FileRealm(RealmConfig config, FileUserPasswdStore userPasswdStore, FileUserRolesStore userRolesStore) { - super(FileRealmSettings.TYPE, config); + FileRealm(RealmConfig config, FileUserPasswdStore userPasswdStore, FileUserRolesStore userRolesStore, ThreadPool threadPool) { + super(FileRealmSettings.TYPE, config, threadPool); this.userPasswdStore = userPasswdStore; userPasswdStore.addListener(this::expireAll); this.userRolesStore = userRolesStore; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapRealm.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapRealm.java index ceb28ada76a..a7c6efdda31 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapRealm.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapRealm.java @@ -67,7 +67,7 @@ public final class LdapRealm extends CachingUsernamePasswordRealm { // pkg private for testing LdapRealm(String type, RealmConfig config, SessionFactory sessionFactory, UserRoleMapper roleMapper, ThreadPool threadPool) { - super(type, config); + super(type, config, threadPool); this.sessionFactory = sessionFactory; this.roleMapper = roleMapper; this.threadPool = threadPool; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java index e5a90c0855f..8dae5275eda 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java @@ -5,11 +5,15 @@ */ package org.elasticsearch.xpack.security.authc.support; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.cache.Cache; import org.elasticsearch.common.cache.CacheBuilder; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.security.authc.AuthenticationResult; import org.elasticsearch.xpack.core.security.authc.AuthenticationToken; import org.elasticsearch.xpack.core.security.authc.RealmConfig; @@ -21,18 +25,21 @@ import org.elasticsearch.xpack.core.security.user.User; import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm implements CachingRealm { - private final Cache cache; + private final Cache>> cache; + private final ThreadPool threadPool; final Hasher hasher; - protected CachingUsernamePasswordRealm(String type, RealmConfig config) { + protected CachingUsernamePasswordRealm(String type, RealmConfig config, ThreadPool threadPool) { super(type, config); hasher = Hasher.resolve(CachingUsernamePasswordRealmSettings.CACHE_HASH_ALGO_SETTING.get(config.settings()), Hasher.SSHA256); + this.threadPool = threadPool; TimeValue ttl = CachingUsernamePasswordRealmSettings.CACHE_TTL_SETTING.get(config.settings()); if (ttl.getNanos() > 0) { - cache = CacheBuilder.builder() + cache = CacheBuilder.>>builder() .setExpireAfterWrite(ttl) .setMaximumWeight(CachingUsernamePasswordRealmSettings.CACHE_MAX_USERS_SETTING.get(config.settings())) .build(); @@ -78,74 +85,95 @@ public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm } private void authenticateWithCache(UsernamePasswordToken token, ActionListener listener) { - UserWithHash userWithHash = cache.get(token.principal()); - if (userWithHash == null) { - if (logger.isDebugEnabled()) { - logger.debug("user [{}] not found in cache for realm [{}], proceeding with normal authentication", - token.principal(), name()); - } - doAuthenticateAndCache(token, ActionListener.wrap((result) -> { - if (result.isAuthenticated()) { - final User user = result.getUser(); - logger.debug("realm [{}] authenticated user [{}], with roles [{}]", name(), token.principal(), user.roles()); + try { + final SetOnce authenticatedUser = new SetOnce<>(); + final AtomicBoolean createdAndStartedFuture = new AtomicBoolean(false); + final ListenableFuture> future = cache.computeIfAbsent(token.principal(), k -> { + final ListenableFuture> created = new ListenableFuture<>(); + if (createdAndStartedFuture.compareAndSet(false, true) == false) { + throw new IllegalStateException("something else already started this. how?"); } - listener.onResponse(result); - }, listener::onFailure)); - } else if (userWithHash.hasHash()) { - if (userWithHash.verify(token.credentials())) { - if (userWithHash.user.enabled()) { - User user = userWithHash.user; - logger.debug("realm [{}] authenticated user [{}], with roles [{}]", name(), token.principal(), user.roles()); - listener.onResponse(AuthenticationResult.success(user)); - } else { - // We successfully authenticated, but the cached user is disabled. - // Reload the primary record to check whether the user is still disabled - cache.invalidate(token.principal()); - doAuthenticateAndCache(token, ActionListener.wrap((result) -> { - if (result.isAuthenticated()) { - final User user = result.getUser(); - logger.debug("realm [{}] authenticated user [{}] (enabled:{}), with roles [{}]", name(), token.principal(), - user.enabled(), user.roles()); - } - listener.onResponse(result); - }, listener::onFailure)); - } - } else { - cache.invalidate(token.principal()); - doAuthenticateAndCache(token, ActionListener.wrap((result) -> { + return created; + }); + + if (createdAndStartedFuture.get()) { + doAuthenticate(token, ActionListener.wrap(result -> { if (result.isAuthenticated()) { final User user = result.getUser(); - logger.debug("cached user's password changed. realm [{}] authenticated user [{}], with roles [{}]", - name(), token.principal(), user.roles()); + authenticatedUser.set(user); + final UserWithHash userWithHash = new UserWithHash(user, token.credentials(), hasher); + future.onResponse(new Tuple<>(result, userWithHash)); + } else { + future.onResponse(new Tuple<>(result, null)); } - listener.onResponse(result); - }, listener::onFailure)); + }, future::onFailure)); } - } else { - cache.invalidate(token.principal()); - doAuthenticateAndCache(token, ActionListener.wrap((result) -> { - if (result.isAuthenticated()) { - final User user = result.getUser(); - logger.debug("cached user came from a lookup and could not be used for authentication. " + - "realm [{}] authenticated user [{}] with roles [{}]", name(), token.principal(), user.roles()); + + future.addListener(ActionListener.wrap(tuple -> { + if (tuple != null) { + final UserWithHash userWithHash = tuple.v2(); + final boolean performedAuthentication = createdAndStartedFuture.get() && userWithHash != null && + tuple.v2().user == authenticatedUser.get(); + handleResult(future, createdAndStartedFuture.get(), performedAuthentication, token, tuple, listener); + } else { + handleFailure(future, createdAndStartedFuture.get(), token, new IllegalStateException("unknown error authenticating"), + listener); } - listener.onResponse(result); - }, listener::onFailure)); + }, e -> handleFailure(future, createdAndStartedFuture.get(), token, e, listener)), + threadPool.executor(ThreadPool.Names.GENERIC)); + } catch (ExecutionException e) { + listener.onResponse(AuthenticationResult.unsuccessful("", e)); } } - private void doAuthenticateAndCache(UsernamePasswordToken token, ActionListener listener) { - ActionListener wrapped = ActionListener.wrap((result) -> { - Objects.requireNonNull(result, "AuthenticationResult cannot be null"); - if (result.getStatus() == AuthenticationResult.Status.SUCCESS) { - UserWithHash userWithHash = new UserWithHash(result.getUser(), token.credentials(), hasher); - // it doesn't matter if we already computed it elsewhere - cache.put(token.principal(), userWithHash); + private void handleResult(ListenableFuture> future, boolean createdAndStartedFuture, + boolean performedAuthentication, UsernamePasswordToken token, + Tuple result, ActionListener listener) { + final AuthenticationResult authResult = result.v1(); + if (authResult == null) { + // this was from a lookup; clear and redo + cache.invalidate(token.principal(), future); + authenticateWithCache(token, listener); + } else if (authResult.isAuthenticated()) { + if (performedAuthentication) { + listener.onResponse(authResult); + } else { + UserWithHash userWithHash = result.v2(); + if (userWithHash.verify(token.credentials())) { + if (userWithHash.user.enabled()) { + User user = userWithHash.user; + logger.debug("realm [{}] authenticated user [{}], with roles [{}]", + name(), token.principal(), user.roles()); + listener.onResponse(AuthenticationResult.success(user)); + } else { + // re-auth to see if user has been enabled + cache.invalidate(token.principal(), future); + authenticateWithCache(token, listener); + } + } else { + // could be a password change? + cache.invalidate(token.principal(), future); + authenticateWithCache(token, listener); + } } - listener.onResponse(result); - }, listener::onFailure); + } else { + cache.invalidate(token.principal(), future); + if (createdAndStartedFuture) { + listener.onResponse(authResult); + } else { + authenticateWithCache(token, listener); + } + } + } - doAuthenticate(token, wrapped); + private void handleFailure(ListenableFuture> future, boolean createdAndStarted, + UsernamePasswordToken token, Exception e, ActionListener listener) { + cache.invalidate(token.principal(), future); + if (createdAndStarted) { + listener.onFailure(e); + } else { + authenticateWithCache(token, listener); + } } @Override @@ -160,29 +188,34 @@ public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm @Override public final void lookupUser(String username, ActionListener listener) { if (cache != null) { - UserWithHash withHash = cache.get(username); - if (withHash == null) { - try { - doLookupUser(username, ActionListener.wrap((user) -> { - Runnable action = () -> listener.onResponse(null); + try { + ListenableFuture> future = cache.computeIfAbsent(username, key -> { + ListenableFuture> created = new ListenableFuture<>(); + doLookupUser(username, ActionListener.wrap(user -> { if (user != null) { UserWithHash userWithHash = new UserWithHash(user, null, null); - try { - // computeIfAbsent is used here to avoid overwriting a value from a concurrent authenticate call as it - // contains the password hash, which provides a performance boost and we shouldn't just erase that - cache.computeIfAbsent(username, (n) -> userWithHash); - action = () -> listener.onResponse(userWithHash.user); - } catch (ExecutionException e) { - action = () -> listener.onFailure(e); - } + created.onResponse(new Tuple<>(null, userWithHash)); + } else { + created.onResponse(new Tuple<>(null, null)); } - action.run(); - }, listener::onFailure)); - } catch (Exception e) { - listener.onFailure(e); - } - } else { - listener.onResponse(withHash.user); + }, created::onFailure)); + return created; + }); + + future.addListener(ActionListener.wrap(tuple -> { + if (tuple != null) { + if (tuple.v2() == null) { + cache.invalidate(username, future); + listener.onResponse(null); + } else { + listener.onResponse(tuple.v2().user); + } + } else { + listener.onResponse(null); + } + }, listener::onFailure), threadPool.executor(ThreadPool.Names.GENERIC)); + } catch (ExecutionException e) { + listener.onFailure(e); } } else { doLookupUser(username, listener); @@ -192,12 +225,12 @@ public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm protected abstract void doLookupUser(String username, ActionListener listener); private static class UserWithHash { - User user; - char[] hash; - Hasher hasher; + final User user; + final char[] hash; + final Hasher hasher; UserWithHash(User user, SecureString password, Hasher hasher) { - this.user = user; + this.user = Objects.requireNonNull(user); this.hash = password == null ? null : hasher.hash(password); this.hasher = hasher; } @@ -205,9 +238,5 @@ public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm boolean verify(SecureString password) { return hash != null && hasher.verify(password, hash); } - - boolean hasHash() { - return hash != null; - } } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java index 6750560b0b0..2ad46723682 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java @@ -13,9 +13,9 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.Strings; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.action.user.GetUsersRequest; @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.security.authc.esnative.NativeUsersStore; import org.elasticsearch.xpack.security.authc.esnative.ReservedRealm; import org.elasticsearch.xpack.security.authc.esnative.ReservedRealmTests; import org.elasticsearch.xpack.security.support.SecurityIndexManager; +import org.junit.After; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -62,6 +63,7 @@ public class TransportGetUsersActionTests extends ESTestCase { private boolean anonymousEnabled; private Settings settings; + private ThreadPool threadPool; @Before public void maybeEnableAnonymous() { @@ -71,6 +73,14 @@ public class TransportGetUsersActionTests extends ESTestCase { } else { settings = Settings.EMPTY; } + threadPool = new TestThreadPool("TransportGetUsersActionTests"); + } + + @After + public void terminateThreadPool() throws InterruptedException { + if (threadPool != null) { + terminate(threadPool); + } } public void testAnonymousUser() { @@ -79,10 +89,10 @@ public class TransportGetUsersActionTests extends ESTestCase { when(securityIndex.isAvailable()).thenReturn(true); AnonymousUser anonymousUser = new AnonymousUser(settings); ReservedRealm reservedRealm = - new ReservedRealm(mock(Environment.class), settings, usersStore, anonymousUser, securityIndex, new ThreadContext(Settings.EMPTY)); + new ReservedRealm(mock(Environment.class), settings, usersStore, anonymousUser, securityIndex, threadPool); TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); - TransportGetUsersAction action = new TransportGetUsersAction(Settings.EMPTY, mock(ThreadPool.class), mock(ActionFilters.class), + TransportGetUsersAction action = new TransportGetUsersAction(Settings.EMPTY, threadPool, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), usersStore, transportService, reservedRealm); GetUsersRequest request = new GetUsersRequest(); @@ -117,7 +127,7 @@ public class TransportGetUsersActionTests extends ESTestCase { NativeUsersStore usersStore = mock(NativeUsersStore.class); TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); - TransportGetUsersAction action = new TransportGetUsersAction(Settings.EMPTY, mock(ThreadPool.class), mock(ActionFilters.class), + TransportGetUsersAction action = new TransportGetUsersAction(Settings.EMPTY, threadPool, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), usersStore, transportService, mock(ReservedRealm.class)); GetUsersRequest request = new GetUsersRequest(); @@ -151,7 +161,7 @@ public class TransportGetUsersActionTests extends ESTestCase { ReservedRealmTests.mockGetAllReservedUserInfo(usersStore, Collections.emptyMap()); ReservedRealm reservedRealm = - new ReservedRealm(mock(Environment.class), settings, usersStore, new AnonymousUser(settings), securityIndex, new ThreadContext(Settings.EMPTY)); + new ReservedRealm(mock(Environment.class), settings, usersStore, new AnonymousUser(settings), securityIndex, threadPool); PlainActionFuture> userFuture = new PlainActionFuture<>(); reservedRealm.users(userFuture); final Collection allReservedUsers = userFuture.actionGet(); @@ -160,7 +170,7 @@ public class TransportGetUsersActionTests extends ESTestCase { final List names = reservedUsers.stream().map(User::principal).collect(Collectors.toList()); TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); - TransportGetUsersAction action = new TransportGetUsersAction(Settings.EMPTY, mock(ThreadPool.class), mock(ActionFilters.class), + TransportGetUsersAction action = new TransportGetUsersAction(Settings.EMPTY, threadPool, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), usersStore, transportService, reservedRealm); logger.error("names {}", names); @@ -197,10 +207,10 @@ public class TransportGetUsersActionTests extends ESTestCase { when(securityIndex.isAvailable()).thenReturn(true); ReservedRealmTests.mockGetAllReservedUserInfo(usersStore, Collections.emptyMap()); ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), settings, usersStore, new AnonymousUser(settings), - securityIndex, new ThreadContext(Settings.EMPTY)); + securityIndex, threadPool); TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); - TransportGetUsersAction action = new TransportGetUsersAction(Settings.EMPTY, mock(ThreadPool.class), mock(ActionFilters.class), + TransportGetUsersAction action = new TransportGetUsersAction(Settings.EMPTY, threadPool, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), usersStore, transportService, reservedRealm); GetUsersRequest request = new GetUsersRequest(); @@ -247,7 +257,7 @@ public class TransportGetUsersActionTests extends ESTestCase { NativeUsersStore usersStore = mock(NativeUsersStore.class); TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); - TransportGetUsersAction action = new TransportGetUsersAction(Settings.EMPTY, mock(ThreadPool.class), mock(ActionFilters.class), + TransportGetUsersAction action = new TransportGetUsersAction(Settings.EMPTY, threadPool, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), usersStore, transportService, mock(ReservedRealm.class)); GetUsersRequest request = new GetUsersRequest(); @@ -295,7 +305,7 @@ public class TransportGetUsersActionTests extends ESTestCase { NativeUsersStore usersStore = mock(NativeUsersStore.class); TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); - TransportGetUsersAction action = new TransportGetUsersAction(Settings.EMPTY, mock(ThreadPool.class), mock(ActionFilters.class), + TransportGetUsersAction action = new TransportGetUsersAction(Settings.EMPTY, threadPool, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), usersStore, transportService, mock(ReservedRealm.class)); GetUsersRequest request = new GetUsersRequest(); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java index 65cf74971a5..d059911a680 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java @@ -121,14 +121,16 @@ public class TransportPutUserActionTests extends ESTestCase { when(securityIndex.isAvailable()).thenReturn(true); ReservedRealmTests.mockGetAllReservedUserInfo(usersStore, Collections.emptyMap()); Settings settings = Settings.builder().put("path.home", createTempDir()).build(); + final ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings)); ReservedRealm reservedRealm = new ReservedRealm(TestEnvironment.newEnvironment(settings), settings, usersStore, - new AnonymousUser(settings), securityIndex, new ThreadContext(settings)); + new AnonymousUser(settings), securityIndex, threadPool); PlainActionFuture> userFuture = new PlainActionFuture<>(); reservedRealm.users(userFuture); final User reserved = randomFrom(userFuture.actionGet().toArray(new User[0])); TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); - TransportPutUserAction action = new TransportPutUserAction(Settings.EMPTY, mock(ThreadPool.class), mock(ActionFilters.class), + TransportPutUserAction action = new TransportPutUserAction(Settings.EMPTY, threadPool, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), usersStore, transportService); PutUserRequest request = new PutUserRequest(); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmTests.java index 7e2d5242101..633360318c2 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.security.authc.RealmConfig; import org.elasticsearch.xpack.security.support.SecurityIndexManager; @@ -18,6 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.xpack.security.test.SecurityTestUtils.getClusterIndexHealth; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class NativeRealmTests extends ESTestCase { @@ -26,12 +28,15 @@ public class NativeRealmTests extends ESTestCase { } public void testCacheClearOnIndexHealthChange() { + final ThreadPool threadPool = mock(ThreadPool.class); + final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); final AtomicInteger numInvalidation = new AtomicInteger(0); int expectedInvalidation = 0; Settings settings = Settings.builder().put("path.home", createTempDir()).build(); RealmConfig config = new RealmConfig("native", Settings.EMPTY, settings, TestEnvironment.newEnvironment(settings), new ThreadContext(settings)); - final NativeRealm nativeRealm = new NativeRealm(config, mock(NativeUsersStore.class)) { + final NativeRealm nativeRealm = new NativeRealm(config, mock(NativeUsersStore.class), threadPool) { @Override void clearCache() { numInvalidation.incrementAndGet(); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/ReservedRealmTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/ReservedRealmTests.java index 9fc52e8af63..b483595f8ec 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/ReservedRealmTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/ReservedRealmTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.security.authc.AuthenticationResult; import org.elasticsearch.xpack.core.security.authc.esnative.ClientReservedRealm; @@ -63,6 +64,7 @@ public class ReservedRealmTests extends ESTestCase { private static final SecureString EMPTY_PASSWORD = new SecureString("".toCharArray()); private NativeUsersStore usersStore; private SecurityIndexManager securityIndex; + private ThreadPool threadPool; @Before public void setupMocks() throws Exception { @@ -71,6 +73,8 @@ public class ReservedRealmTests extends ESTestCase { when(securityIndex.isAvailable()).thenReturn(true); when(securityIndex.checkMappingVersion(any())).thenReturn(true); mockGetAllReservedUserInfo(usersStore, Collections.emptyMap()); + threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); } public void testReservedUserEmptyPasswordAuthenticationFails() throws Throwable { @@ -78,7 +82,7 @@ public class ReservedRealmTests extends ESTestCase { UsernamesField.BEATS_NAME); final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), Settings.EMPTY, usersStore, - new AnonymousUser(Settings.EMPTY), securityIndex, new ThreadContext(Settings.EMPTY)); + new AnonymousUser(Settings.EMPTY), securityIndex, threadPool); PlainActionFuture listener = new PlainActionFuture<>(); @@ -94,7 +98,7 @@ public class ReservedRealmTests extends ESTestCase { } final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), settings, usersStore, - new AnonymousUser(settings), securityIndex, new ThreadContext(Settings.EMPTY)); + new AnonymousUser(settings), securityIndex, threadPool); final User expected = randomReservedUser(true); final String principal = expected.principal(); @@ -116,7 +120,7 @@ public class ReservedRealmTests extends ESTestCase { private void verifySuccessfulAuthentication(boolean enabled) throws Exception { final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), Settings.EMPTY, usersStore, - new AnonymousUser(Settings.EMPTY), securityIndex, new ThreadContext(Settings.EMPTY)); + new AnonymousUser(Settings.EMPTY), securityIndex, threadPool); final User expectedUser = randomReservedUser(enabled); final String principal = expectedUser.principal(); final SecureString newPassword = new SecureString("foobar".toCharArray()); @@ -157,7 +161,7 @@ public class ReservedRealmTests extends ESTestCase { public void testLookup() throws Exception { final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), Settings.EMPTY, usersStore, - new AnonymousUser(Settings.EMPTY), securityIndex, new ThreadContext(Settings.EMPTY)); + new AnonymousUser(Settings.EMPTY), securityIndex, threadPool); final User expectedUser = randomReservedUser(true); final String principal = expectedUser.principal(); @@ -182,7 +186,7 @@ public class ReservedRealmTests extends ESTestCase { Settings settings = Settings.builder().put(XPackSettings.RESERVED_REALM_ENABLED_SETTING.getKey(), false).build(); final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), settings, usersStore, new AnonymousUser(settings), - securityIndex, new ThreadContext(Settings.EMPTY)); + securityIndex, threadPool); final User expectedUser = randomReservedUser(true); final String principal = expectedUser.principal(); @@ -196,7 +200,7 @@ public class ReservedRealmTests extends ESTestCase { public void testLookupThrows() throws Exception { final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), Settings.EMPTY, usersStore, - new AnonymousUser(Settings.EMPTY), securityIndex, new ThreadContext(Settings.EMPTY)); + new AnonymousUser(Settings.EMPTY), securityIndex, threadPool); final User expectedUser = randomReservedUser(true); final String principal = expectedUser.principal(); when(securityIndex.indexExists()).thenReturn(true); @@ -243,7 +247,7 @@ public class ReservedRealmTests extends ESTestCase { public void testGetUsers() { final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), Settings.EMPTY, usersStore, - new AnonymousUser(Settings.EMPTY), securityIndex, new ThreadContext(Settings.EMPTY)); + new AnonymousUser(Settings.EMPTY), securityIndex, threadPool); PlainActionFuture> userFuture = new PlainActionFuture<>(); reservedRealm.users(userFuture); assertThat(userFuture.actionGet(), @@ -258,7 +262,7 @@ public class ReservedRealmTests extends ESTestCase { .build(); final AnonymousUser anonymousUser = new AnonymousUser(settings); final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), settings, usersStore, anonymousUser, - securityIndex, new ThreadContext(Settings.EMPTY)); + securityIndex, threadPool); PlainActionFuture> userFuture = new PlainActionFuture<>(); reservedRealm.users(userFuture); if (anonymousEnabled) { @@ -275,7 +279,7 @@ public class ReservedRealmTests extends ESTestCase { ReservedUserInfo userInfo = new ReservedUserInfo(hash, true, false); mockGetAllReservedUserInfo(usersStore, Collections.singletonMap("elastic", userInfo)); final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), Settings.EMPTY, usersStore, - new AnonymousUser(Settings.EMPTY), securityIndex, new ThreadContext(Settings.EMPTY)); + new AnonymousUser(Settings.EMPTY), securityIndex, threadPool); if (randomBoolean()) { PlainActionFuture future = new PlainActionFuture<>(); @@ -305,7 +309,7 @@ public class ReservedRealmTests extends ESTestCase { when(securityIndex.indexExists()).thenReturn(true); final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), settings, usersStore, - new AnonymousUser(Settings.EMPTY), securityIndex, new ThreadContext(Settings.EMPTY)); + new AnonymousUser(Settings.EMPTY), securityIndex, threadPool); PlainActionFuture listener = new PlainActionFuture<>(); doAnswer((i) -> { @@ -327,7 +331,7 @@ public class ReservedRealmTests extends ESTestCase { when(securityIndex.indexExists()).thenReturn(true); final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), settings, usersStore, - new AnonymousUser(Settings.EMPTY), securityIndex, new ThreadContext(Settings.EMPTY)); + new AnonymousUser(Settings.EMPTY), securityIndex, threadPool); PlainActionFuture listener = new PlainActionFuture<>(); SecureString password = new SecureString("password".toCharArray()); doAnswer((i) -> { @@ -354,7 +358,7 @@ public class ReservedRealmTests extends ESTestCase { when(securityIndex.indexExists()).thenReturn(false); final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), settings, usersStore, - new AnonymousUser(Settings.EMPTY), securityIndex, new ThreadContext(Settings.EMPTY)); + new AnonymousUser(Settings.EMPTY), securityIndex, threadPool); PlainActionFuture listener = new PlainActionFuture<>(); reservedRealm.doAuthenticate(new UsernamePasswordToken(new ElasticUser(true).principal(), @@ -372,7 +376,7 @@ public class ReservedRealmTests extends ESTestCase { when(securityIndex.indexExists()).thenReturn(true); final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), settings, usersStore, - new AnonymousUser(Settings.EMPTY), securityIndex, new ThreadContext(Settings.EMPTY)); + new AnonymousUser(Settings.EMPTY), securityIndex, threadPool); PlainActionFuture listener = new PlainActionFuture<>(); final String principal = randomFrom(KibanaUser.NAME, LogstashSystemUser.NAME, BeatsSystemUser.NAME); @@ -394,7 +398,7 @@ public class ReservedRealmTests extends ESTestCase { when(securityIndex.indexExists()).thenReturn(false); final ReservedRealm reservedRealm = new ReservedRealm(mock(Environment.class), settings, usersStore, - new AnonymousUser(Settings.EMPTY), securityIndex, new ThreadContext(Settings.EMPTY)); + new AnonymousUser(Settings.EMPTY), securityIndex, threadPool); PlainActionFuture listener = new PlainActionFuture<>(); final String principal = randomFrom(KibanaUser.NAME, LogstashSystemUser.NAME, BeatsSystemUser.NAME); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/file/FileRealmTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/file/FileRealmTests.java index b1500cc7520..b0f53229377 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/file/FileRealmTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/file/FileRealmTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.security.authc.AuthenticationResult; import org.elasticsearch.xpack.core.security.authc.RealmConfig; @@ -50,20 +51,26 @@ public class FileRealmTests extends ESTestCase { private FileUserPasswdStore userPasswdStore; private FileUserRolesStore userRolesStore; private Settings globalSettings; + private ThreadPool threadPool; + private ThreadContext threadContext; @Before public void init() throws Exception { userPasswdStore = mock(FileUserPasswdStore.class); userRolesStore = mock(FileUserRolesStore.class); globalSettings = Settings.builder().put("path.home", createTempDir()).build(); + threadPool = mock(ThreadPool.class); + threadContext = new ThreadContext(globalSettings); + when(threadPool.getThreadContext()).thenReturn(threadContext); } public void testAuthenticate() throws Exception { when(userPasswdStore.verifyPassword(eq("user1"), eq(new SecureString("test123")), any(Supplier.class))) .thenAnswer(VERIFY_PASSWORD_ANSWER); when(userRolesStore.roles("user1")).thenReturn(new String[] { "role1", "role2" }); - RealmConfig config = new RealmConfig("file-test", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(globalSettings)); - FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore); + RealmConfig config = new RealmConfig("file-test", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), + threadContext); + FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore, threadPool); PlainActionFuture future = new PlainActionFuture<>(); realm.authenticate(new UsernamePasswordToken("user1", new SecureString("test123")), future); final AuthenticationResult result = future.actionGet(); @@ -80,11 +87,12 @@ public class FileRealmTests extends ESTestCase { Settings settings = Settings.builder() .put("cache.hash_algo", Hasher.values()[randomIntBetween(0, Hasher.values().length - 1)].name().toLowerCase(Locale.ROOT)) .build(); - RealmConfig config = new RealmConfig("file-test", settings, globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(globalSettings)); + RealmConfig config = new RealmConfig("file-test", settings, globalSettings, TestEnvironment.newEnvironment(globalSettings), + threadContext); when(userPasswdStore.verifyPassword(eq("user1"), eq(new SecureString("test123")), any(Supplier.class))) .thenAnswer(VERIFY_PASSWORD_ANSWER); when(userRolesStore.roles("user1")).thenReturn(new String[]{"role1", "role2"}); - FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore); + FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore, threadPool); PlainActionFuture future = new PlainActionFuture<>(); realm.authenticate(new UsernamePasswordToken("user1", new SecureString("test123")), future); User user1 = future.actionGet().getUser(); @@ -95,13 +103,14 @@ public class FileRealmTests extends ESTestCase { } public void testAuthenticateCachingRefresh() throws Exception { - RealmConfig config = new RealmConfig("file-test", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(globalSettings)); + RealmConfig config = new RealmConfig("file-test", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), + threadContext); userPasswdStore = spy(new UserPasswdStore(config)); userRolesStore = spy(new UserRolesStore(config)); when(userPasswdStore.verifyPassword(eq("user1"), eq(new SecureString("test123")), any(Supplier.class))) .thenAnswer(VERIFY_PASSWORD_ANSWER); doReturn(new String[] { "role1", "role2" }).when(userRolesStore).roles("user1"); - FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore); + FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore, threadPool); PlainActionFuture future = new PlainActionFuture<>(); realm.authenticate(new UsernamePasswordToken("user1", new SecureString("test123")), future); User user1 = future.actionGet().getUser(); @@ -134,11 +143,12 @@ public class FileRealmTests extends ESTestCase { } public void testToken() throws Exception { - RealmConfig config = new RealmConfig("file-test", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(globalSettings)); + RealmConfig config = new RealmConfig("file-test", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), + threadContext); when(userPasswdStore.verifyPassword(eq("user1"), eq(new SecureString("test123")), any(Supplier.class))) .thenAnswer(VERIFY_PASSWORD_ANSWER); when(userRolesStore.roles("user1")).thenReturn(new String[]{"role1", "role2"}); - FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore); + FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore, threadPool); ThreadContext threadContext = new ThreadContext(Settings.EMPTY); UsernamePasswordToken.putTokenHeader(threadContext, new UsernamePasswordToken("user1", new SecureString("test123"))); @@ -153,8 +163,9 @@ public class FileRealmTests extends ESTestCase { public void testLookup() throws Exception { when(userPasswdStore.userExists("user1")).thenReturn(true); when(userRolesStore.roles("user1")).thenReturn(new String[] { "role1", "role2" }); - RealmConfig config = new RealmConfig("file-test", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(globalSettings)); - FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore); + RealmConfig config = new RealmConfig("file-test", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), + threadContext); + FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore, threadPool); PlainActionFuture future = new PlainActionFuture<>(); realm.lookupUser("user1", future); @@ -170,8 +181,9 @@ public class FileRealmTests extends ESTestCase { public void testLookupCaching() throws Exception { when(userPasswdStore.userExists("user1")).thenReturn(true); when(userRolesStore.roles("user1")).thenReturn(new String[] { "role1", "role2" }); - RealmConfig config = new RealmConfig("file-test", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(globalSettings)); - FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore); + RealmConfig config = new RealmConfig("file-test", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), + threadContext); + FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore, threadPool); PlainActionFuture future = new PlainActionFuture<>(); realm.lookupUser("user1", future); @@ -185,12 +197,13 @@ public class FileRealmTests extends ESTestCase { } public void testLookupCachingWithRefresh() throws Exception { - RealmConfig config = new RealmConfig("file-test", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(globalSettings)); + RealmConfig config = new RealmConfig("file-test", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), + threadContext); userPasswdStore = spy(new UserPasswdStore(config)); userRolesStore = spy(new UserRolesStore(config)); doReturn(true).when(userPasswdStore).userExists("user1"); doReturn(new String[] { "role1", "role2" }).when(userRolesStore).roles("user1"); - FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore); + FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore, threadPool); PlainActionFuture future = new PlainActionFuture<>(); realm.lookupUser("user1", future); User user1 = future.actionGet(); @@ -231,8 +244,9 @@ public class FileRealmTests extends ESTestCase { int order = randomIntBetween(0, 10); settings.put("order", order); - RealmConfig config = new RealmConfig("file-realm", settings.build(), globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(globalSettings)); - FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore); + RealmConfig config = new RealmConfig("file-realm", settings.build(), globalSettings, TestEnvironment.newEnvironment(globalSettings), + threadContext); + FileRealm realm = new FileRealm(config, userPasswdStore, userRolesStore, threadPool); Map usage = realm.usageStats(); assertThat(usage, is(notNullValue())); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java index 87f62cd97a1..38a6344f98e 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java @@ -14,6 +14,8 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.SecuritySettingsSourceField; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.security.authc.AuthenticationResult; import org.elasticsearch.xpack.core.security.authc.Realm; import org.elasticsearch.xpack.core.security.authc.RealmConfig; @@ -22,6 +24,7 @@ import org.elasticsearch.xpack.core.security.authc.support.CachingUsernamePasswo import org.elasticsearch.xpack.core.security.authc.support.Hasher; import org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken; import org.elasticsearch.xpack.core.security.user.User; +import org.junit.After; import org.junit.Before; import java.util.ArrayList; @@ -42,10 +45,19 @@ import static org.hamcrest.Matchers.sameInstance; public class CachingUsernamePasswordRealmTests extends ESTestCase { private Settings globalSettings; + private ThreadPool threadPool; @Before public void setup() { globalSettings = Settings.builder().put("path.home", createTempDir()).build(); + threadPool = new TestThreadPool("caching username password realm tests"); + } + + @After + public void stop() throws InterruptedException { + if (threadPool != null) { + terminate(threadPool); + } } public void testSettings() throws Exception { @@ -61,7 +73,7 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { RealmConfig config = new RealmConfig("test_realm", settings, globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(Settings.EMPTY)); - CachingUsernamePasswordRealm realm = new CachingUsernamePasswordRealm("test", config) { + CachingUsernamePasswordRealm realm = new CachingUsernamePasswordRealm("test", config, threadPool) { @Override protected void doAuthenticate(UsernamePasswordToken token, ActionListener listener) { listener.onResponse(AuthenticationResult.success(new User("username", new String[]{"r1", "r2", "r3"}))); @@ -77,7 +89,7 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { } public void testAuthCache() { - AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(globalSettings); + AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(globalSettings, threadPool); SecureString pass = new SecureString("pass"); PlainActionFuture future = new PlainActionFuture<>(); realm.authenticate(new UsernamePasswordToken("a", pass), future); @@ -106,7 +118,7 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { } public void testLookupCache() { - AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(globalSettings); + AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(globalSettings, threadPool); PlainActionFuture future = new PlainActionFuture<>(); realm.lookupUser("a", future); future.actionGet(); @@ -133,7 +145,7 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { } public void testLookupAndAuthCache() { - AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(globalSettings); + AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(globalSettings, threadPool); // lookup first PlainActionFuture lookupFuture = new PlainActionFuture<>(); realm.lookupUser("a", lookupFuture); @@ -172,7 +184,7 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { } public void testCacheChangePassword() { - AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(globalSettings); + AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(globalSettings, threadPool); String user = "testUser"; SecureString pass1 = new SecureString("pass"); @@ -198,7 +210,7 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { } public void testCacheDisabledUser() { - AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(globalSettings); + AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(globalSettings, threadPool); realm.setUsersEnabled(false); String user = "testUser"; @@ -233,7 +245,7 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { .build(); RealmConfig config = new RealmConfig("test_cache_ttl", settings, globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(Settings.EMPTY)); - AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(config); + AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(config, threadPool); final UsernamePasswordToken authToken = new UsernamePasswordToken("the-user", new SecureString("the-password")); @@ -262,7 +274,7 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { .build(); RealmConfig config = new RealmConfig("test_cache_ttl", settings, globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(Settings.EMPTY)); - AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(config); + AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(config, threadPool); final UsernamePasswordToken authToken = new UsernamePasswordToken("the-user", new SecureString("the-password")); PlainActionFuture future = new PlainActionFuture<>(); @@ -304,13 +316,13 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { } public void testAuthenticateContract() throws Exception { - Realm realm = new FailingAuthenticationRealm(Settings.EMPTY, globalSettings); + Realm realm = new FailingAuthenticationRealm(Settings.EMPTY, globalSettings, threadPool); PlainActionFuture future = new PlainActionFuture<>(); realm.authenticate(new UsernamePasswordToken("user", new SecureString("pass")), future); User user = future.actionGet().getUser(); assertThat(user, nullValue()); - realm = new ThrowingAuthenticationRealm(Settings.EMPTY, globalSettings); + realm = new ThrowingAuthenticationRealm(Settings.EMPTY, globalSettings, threadPool); future = new PlainActionFuture<>(); realm.authenticate(new UsernamePasswordToken("user", new SecureString("pass")), future); RuntimeException e = expectThrows(RuntimeException.class, future::actionGet); @@ -318,19 +330,85 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { } public void testLookupContract() throws Exception { - Realm realm = new FailingAuthenticationRealm(Settings.EMPTY, globalSettings); + Realm realm = new FailingAuthenticationRealm(Settings.EMPTY, globalSettings, threadPool); PlainActionFuture future = new PlainActionFuture<>(); realm.lookupUser("user", future); User user = future.actionGet(); assertThat(user, nullValue()); - realm = new ThrowingAuthenticationRealm(Settings.EMPTY, globalSettings); + realm = new ThrowingAuthenticationRealm(Settings.EMPTY, globalSettings, threadPool); future = new PlainActionFuture<>(); realm.lookupUser("user", future); RuntimeException e = expectThrows(RuntimeException.class, future::actionGet); assertThat(e.getMessage(), containsString("lookup exception")); } + public void testSingleAuthPerUserLimit() throws Exception { + final String username = "username"; + final SecureString password = SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING; + final AtomicInteger authCounter = new AtomicInteger(0); + + final String passwordHash = new String(Hasher.BCRYPT.hash(password)); + RealmConfig config = new RealmConfig("test_realm", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), + new ThreadContext(Settings.EMPTY)); + final CachingUsernamePasswordRealm realm = new CachingUsernamePasswordRealm("test", config, threadPool) { + @Override + protected void doAuthenticate(UsernamePasswordToken token, ActionListener listener) { + authCounter.incrementAndGet(); + // do something slow + if (BCrypt.checkpw(token.credentials(), passwordHash)) { + listener.onResponse(AuthenticationResult.success(new User(username, new String[]{"r1", "r2", "r3"}))); + } else { + listener.onFailure(new IllegalStateException("password auth should never fail")); + } + } + + @Override + protected void doLookupUser(String username, ActionListener listener) { + listener.onFailure(new UnsupportedOperationException("this method should not be called")); + } + }; + + final int numberOfProcessors = Runtime.getRuntime().availableProcessors(); + final int numberOfThreads = scaledRandomIntBetween((numberOfProcessors + 1) / 2, numberOfProcessors * 3); + final int numberOfIterations = scaledRandomIntBetween(20, 100); + final CountDownLatch latch = new CountDownLatch(1 + numberOfThreads); + List threads = new ArrayList<>(numberOfThreads); + for (int i = 0; i < numberOfThreads; i++) { + threads.add(new Thread(() -> { + try { + latch.countDown(); + latch.await(); + for (int i1 = 0; i1 < numberOfIterations; i1++) { + UsernamePasswordToken token = new UsernamePasswordToken(username, password); + + realm.authenticate(token, ActionListener.wrap((result) -> { + if (result.isAuthenticated() == false) { + throw new IllegalStateException("proper password led to an unauthenticated result: " + result); + } + }, (e) -> { + logger.error("caught exception", e); + fail("unexpected exception - " + e); + })); + } + + } catch (InterruptedException e) { + logger.error("thread was interrupted", e); + Thread.currentThread().interrupt(); + } + })); + } + + for (Thread thread : threads) { + thread.start(); + } + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + assertEquals(1, authCounter.get()); + } + public void testCacheConcurrency() throws Exception { final String username = "username"; final SecureString password = SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING; @@ -339,7 +417,7 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { final String passwordHash = new String(Hasher.BCRYPT.hash(password)); RealmConfig config = new RealmConfig("test_realm", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(Settings.EMPTY)); - final CachingUsernamePasswordRealm realm = new CachingUsernamePasswordRealm("test", config) { + final CachingUsernamePasswordRealm realm = new CachingUsernamePasswordRealm("test", config, threadPool) { @Override protected void doAuthenticate(UsernamePasswordToken token, ActionListener listener) { // do something slow @@ -356,37 +434,37 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { } }; - final CountDownLatch latch = new CountDownLatch(1); final int numberOfProcessors = Runtime.getRuntime().availableProcessors(); final int numberOfThreads = scaledRandomIntBetween((numberOfProcessors + 1) / 2, numberOfProcessors * 3); final int numberOfIterations = scaledRandomIntBetween(20, 100); - List threads = new ArrayList<>(); + final CountDownLatch latch = new CountDownLatch(1 + numberOfThreads); + List threads = new ArrayList<>(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { final boolean invalidPassword = randomBoolean(); - threads.add(new Thread() { - @Override - public void run() { - try { - latch.await(); - for (int i = 0; i < numberOfIterations; i++) { - UsernamePasswordToken token = new UsernamePasswordToken(username, invalidPassword ? randomPassword : password); + threads.add(new Thread(() -> { + try { + latch.countDown(); + latch.await(); + for (int i1 = 0; i1 < numberOfIterations; i1++) { + UsernamePasswordToken token = new UsernamePasswordToken(username, invalidPassword ? randomPassword : password); - realm.authenticate(token, ActionListener.wrap((result) -> { - if (invalidPassword && result.isAuthenticated()) { - throw new RuntimeException("invalid password led to an authenticated user: " + result); - } else if (invalidPassword == false && result.isAuthenticated() == false) { - throw new RuntimeException("proper password led to an unauthenticated result: " + result); - } - }, (e) -> { - logger.error("caught exception", e); - fail("unexpected exception - " + e); - })); - } - - } catch (InterruptedException e) { + realm.authenticate(token, ActionListener.wrap((result) -> { + if (invalidPassword && result.isAuthenticated()) { + throw new RuntimeException("invalid password led to an authenticated user: " + result); + } else if (invalidPassword == false && result.isAuthenticated() == false) { + throw new RuntimeException("proper password led to an unauthenticated result: " + result); + } + }, (e) -> { + logger.error("caught exception", e); + fail("unexpected exception - " + e); + })); } + + } catch (InterruptedException e) { + logger.error("thread was interrupted", e); + Thread.currentThread().interrupt(); } - }); + })); } for (Thread thread : threads) { @@ -400,10 +478,11 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { public void testUserLookupConcurrency() throws Exception { final String username = "username"; + final AtomicInteger lookupCounter = new AtomicInteger(0); RealmConfig config = new RealmConfig("test_realm", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(Settings.EMPTY)); - final CachingUsernamePasswordRealm realm = new CachingUsernamePasswordRealm("test", config) { + final CachingUsernamePasswordRealm realm = new CachingUsernamePasswordRealm("test", config, threadPool) { @Override protected void doAuthenticate(UsernamePasswordToken token, ActionListener listener) { listener.onFailure(new UnsupportedOperationException("authenticate should not be called!")); @@ -411,36 +490,37 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { @Override protected void doLookupUser(String username, ActionListener listener) { + lookupCounter.incrementAndGet(); listener.onResponse(new User(username, new String[]{"r1", "r2", "r3"})); } }; - final CountDownLatch latch = new CountDownLatch(1); final int numberOfProcessors = Runtime.getRuntime().availableProcessors(); final int numberOfThreads = scaledRandomIntBetween(numberOfProcessors, numberOfProcessors * 3); final int numberOfIterations = scaledRandomIntBetween(10000, 100000); - List threads = new ArrayList<>(); + final CountDownLatch latch = new CountDownLatch(1 + numberOfThreads); + List threads = new ArrayList<>(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { - threads.add(new Thread() { - @Override - public void run() { - try { - latch.await(); - for (int i = 0; i < numberOfIterations; i++) { - realm.lookupUser(username, ActionListener.wrap((user) -> { - if (user == null) { - throw new RuntimeException("failed to lookup user"); - } - }, (e) -> { - logger.error("caught exception", e); - fail("unexpected exception"); - })); - } - - } catch (InterruptedException e) { + threads.add(new Thread(() -> { + try { + latch.countDown(); + latch.await(); + for (int i1 = 0; i1 < numberOfIterations; i1++) { + realm.lookupUser(username, ActionListener.wrap((user) -> { + if (user == null) { + throw new RuntimeException("failed to lookup user"); + } + }, (e) -> { + logger.error("caught exception", e); + fail("unexpected exception"); + })); } + + } catch (InterruptedException e) { + logger.error("thread was interrupted", e); + Thread.currentThread().interrupt(); } - }); + })); } for (Thread thread : threads) { @@ -450,13 +530,14 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { for (Thread thread : threads) { thread.join(); } + assertEquals(1, lookupCounter.get()); } static class FailingAuthenticationRealm extends CachingUsernamePasswordRealm { - FailingAuthenticationRealm(Settings settings, Settings global) { + FailingAuthenticationRealm(Settings settings, Settings global, ThreadPool threadPool) { super("failing", new RealmConfig("failing-test", settings, global, TestEnvironment.newEnvironment(global), - new ThreadContext(Settings.EMPTY))); + threadPool.getThreadContext()), threadPool); } @Override @@ -472,9 +553,9 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { static class ThrowingAuthenticationRealm extends CachingUsernamePasswordRealm { - ThrowingAuthenticationRealm(Settings settings, Settings globalSettings) { + ThrowingAuthenticationRealm(Settings settings, Settings globalSettings, ThreadPool threadPool) { super("throwing", new RealmConfig("throwing-test", settings, globalSettings, TestEnvironment.newEnvironment(globalSettings), - new ThreadContext(Settings.EMPTY))); + threadPool.getThreadContext()), threadPool); } @Override @@ -495,13 +576,13 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { private boolean usersEnabled = true; - AlwaysAuthenticateCachingRealm(Settings globalSettings) { + AlwaysAuthenticateCachingRealm(Settings globalSettings, ThreadPool threadPool) { this(new RealmConfig("always-test", Settings.EMPTY, globalSettings, TestEnvironment.newEnvironment(globalSettings), - new ThreadContext(Settings.EMPTY))); + threadPool.getThreadContext()), threadPool); } - AlwaysAuthenticateCachingRealm(RealmConfig config) { - super("always", config); + AlwaysAuthenticateCachingRealm(RealmConfig config, ThreadPool threadPool) { + super("always", config, threadPool); } void setUsersEnabled(boolean usersEnabled) { @@ -527,9 +608,9 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { public final AtomicInteger authInvocationCounter = new AtomicInteger(0); public final AtomicInteger lookupInvocationCounter = new AtomicInteger(0); - LookupNotSupportedRealm(Settings globalSettings) { + LookupNotSupportedRealm(Settings globalSettings, ThreadPool threadPool) { super("lookup", new RealmConfig("lookup-notsupported-test", Settings.EMPTY, globalSettings, - TestEnvironment.newEnvironment(globalSettings), new ThreadContext(Settings.EMPTY))); + TestEnvironment.newEnvironment(globalSettings), threadPool.getThreadContext()), threadPool); } @Override diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStoreTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStoreTests.java index 2a1c2dabe30..052ba385510 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStoreTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStoreTests.java @@ -198,7 +198,7 @@ public class NativeRoleMappingStoreTests extends ESTestCase { final Environment env = TestEnvironment.newEnvironment(settings); final RealmConfig realmConfig = new RealmConfig(getTestName(), Settings.EMPTY, settings, env, threadContext); - final CachingUsernamePasswordRealm mockRealm = new CachingUsernamePasswordRealm("test", realmConfig) { + final CachingUsernamePasswordRealm mockRealm = new CachingUsernamePasswordRealm("test", realmConfig, threadPool) { @Override protected void doAuthenticate(UsernamePasswordToken token, ActionListener listener) { listener.onResponse(AuthenticationResult.notHandled());