diff --git a/server/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java b/server/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java index 532396ee609..2bb51c7194a 100644 --- a/server/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java @@ -29,8 +29,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; /** - * An action listener that delegates it's results to another listener once - * it has received one or more failures or N results. This allows synchronous + * An action listener that delegates its results to another listener once + * it has received N results (either successes or failures). This allows synchronous * tasks to be forked off in a loop with the same listener and respond to a * higher level listener once all tasks responded. */ @@ -39,7 +39,6 @@ public final class GroupedActionListener implements ActionListener { private final AtomicInteger pos = new AtomicInteger(); private final AtomicArray results; private final ActionListener> delegate; - private final Collection defaults; private final AtomicReference failure = new AtomicReference<>(); /** @@ -47,12 +46,13 @@ public final class GroupedActionListener implements ActionListener { * @param delegate the delegate listener * @param groupSize the group size */ - public GroupedActionListener(ActionListener> delegate, int groupSize, - Collection defaults) { + public GroupedActionListener(ActionListener> delegate, int groupSize) { + if (groupSize <= 0) { + throw new IllegalArgumentException("groupSize must be greater than 0 but was " + groupSize); + } results = new AtomicArray<>(groupSize); countDown = new CountDown(groupSize); this.delegate = delegate; - this.defaults = defaults; } @Override @@ -63,7 +63,6 @@ public final class GroupedActionListener implements ActionListener { delegate.onFailure(failure.get()); } else { List collect = this.results.asList(); - collect.addAll(defaults); delegate.onResponse(Collections.unmodifiableList(collect)); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/GroupedActionListenerTests.java b/server/src/test/java/org/elasticsearch/action/support/GroupedActionListenerTests.java index 9f6454d4e4b..fb32656da91 100644 --- a/server/src/test/java/org/elasticsearch/action/support/GroupedActionListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/GroupedActionListenerTests.java @@ -51,10 +51,7 @@ public class GroupedActionListenerTests extends ESTestCase { }; final int groupSize = randomIntBetween(10, 1000); AtomicInteger count = new AtomicInteger(); - Collection defaults = randomBoolean() ? Collections.singletonList(-1) : - Collections.emptyList(); - GroupedActionListener listener = new GroupedActionListener<>(result, groupSize, - defaults); + GroupedActionListener listener = new GroupedActionListener<>(result, groupSize); int numThreads = randomIntBetween(2, 5); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads); @@ -65,7 +62,7 @@ public class GroupedActionListenerTests extends ESTestCase { } catch (Exception e) { throw new AssertionError(e); } - int c = 0; + int c; while((c = count.incrementAndGet()) <= groupSize) { listener.onResponse(c-1); } @@ -78,10 +75,9 @@ public class GroupedActionListenerTests extends ESTestCase { assertNotNull(resRef.get()); ArrayList list = new ArrayList<>(resRef.get()); Collections.sort(list); - int expectedSize = groupSize + defaults.size(); - assertEquals(expectedSize, resRef.get().size()); - int expectedValue = defaults.isEmpty() ? 0 : -1; - for (int i = 0; i < expectedSize; i++) { + assertEquals(groupSize, resRef.get().size()); + int expectedValue = 0; + for (int i = 0; i < groupSize; i++) { assertEquals(Integer.valueOf(expectedValue++), list.get(i)); } } @@ -101,9 +97,8 @@ public class GroupedActionListenerTests extends ESTestCase { excRef.set(e); } }; - Collection defaults = randomBoolean() ? Collections.singletonList(-1) : Collections.emptyList(); int size = randomIntBetween(3, 4); - GroupedActionListener listener = new GroupedActionListener<>(result, size, defaults); + GroupedActionListener listener = new GroupedActionListener<>(result, size); listener.onResponse(0); IOException ioException = new IOException(); RuntimeException rtException = new RuntimeException(); @@ -124,8 +119,7 @@ public class GroupedActionListenerTests extends ESTestCase { public void testConcurrentFailures() throws InterruptedException { AtomicReference finalException = new AtomicReference<>(); int numGroups = randomIntBetween(10, 100); - GroupedActionListener listener = new GroupedActionListener<>( - ActionListener.wrap(r -> {}, finalException::set), numGroups, Collections.emptyList()); + GroupedActionListener listener = new GroupedActionListener<>(ActionListener.wrap(r -> {}, finalException::set), numGroups); ExecutorService executorService = Executors.newFixedThreadPool(numGroups); for (int i = 0; i < numGroups; i++) { executorService.submit(() -> listener.onFailure(new IOException())); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index ab78dbf3d62..ec24289cd12 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -433,7 +433,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { ActionListener.wrap( rs -> resultHandler.accept(new AutoFollowResult(autoFollowPattenName, new ArrayList<>(rs))), e -> { throw new AssertionError("must never happen", e); }), - leaderIndicesToFollow.size(), Collections.emptyList()); + leaderIndicesToFollow.size()); for (final Index indexToFollow : leaderIndicesToFollow) { List otherMatchingPatterns = patternsForTheSameRemoteCluster.stream() diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index 0e6b0ccceff..e6c889ca6e3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -41,7 +41,6 @@ import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.util.Collection; -import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -137,8 +136,8 @@ public class TransportUnfollowAction extends TransportMasterNodeAction groupedListener = new GroupedActionListener<>( - ActionListener.wrap(collection -> listener.onResponse(collection.size()), listener::onFailure), - tokens.size(), Collections.emptyList() - ); + ActionListener.wrap(collection -> listener.onResponse(collection.size()), listener::onFailure), tokens.size()); tokens.forEach(tuple -> invalidateTokenPair(tuple, groupedListener)); } }, listener::onFailure diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersAction.java index 48b73ce1324..3adfd4a807e 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersAction.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersAction.java @@ -70,7 +70,7 @@ public class TransportGetUsersAction extends HandledTransportAction> groupListener = - new GroupedActionListener<>(sendingListener, 2, Collections.emptyList()); + new GroupedActionListener<>(sendingListener, 2); // We have two sources for the users object, the reservedRealm and the usersStore, we query both at the same time with a // GroupedActionListener if (realmLookup.isEmpty()) { @@ -84,8 +84,7 @@ public class TransportGetUsersAction extends HandledTransportAction realmGroupListener = new GroupedActionListener<>(groupListener, realmLookup.size(), - Collections.emptyList()); + GroupedActionListener realmGroupListener = new GroupedActionListener<>(groupListener, realmLookup.size()); for (String user : realmLookup) { reservedRealm.lookupUser(user, realmGroupListener); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/CompositeRoleMapper.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/CompositeRoleMapper.java index 1723473df05..21f6a1e2a8c 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/CompositeRoleMapper.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/CompositeRoleMapper.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.security.authc.support.mapper; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -43,7 +42,7 @@ public class CompositeRoleMapper implements UserRoleMapper { public void resolveRoles(UserData user, ActionListener> listener) { GroupedActionListener> groupListener = new GroupedActionListener<>(ActionListener.wrap( composite -> listener.onResponse(composite.stream().flatMap(Set::stream).collect(Collectors.toSet())), listener::onFailure - ), delegates.size(), Collections.emptyList()); + ), delegates.size()); this.delegates.forEach(mapper -> mapper.resolveRoles(user, groupListener)); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index acaf152628e..ba4839e67d2 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -506,7 +506,7 @@ public class AuthorizationService { listener.onResponse(null); }, listener::onFailure); final ActionListener> groupedActionListener = wrapPreservingContext( - new GroupedActionListener<>(bulkAuthzListener, actionToIndicesMap.size(), Collections.emptyList()), threadContext); + new GroupedActionListener<>(bulkAuthzListener, actionToIndicesMap.size()), threadContext); actionToIndicesMap.forEach((bulkItemAction, indices) -> { final RequestInfo bulkItemInfo = diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java index de84311c541..09c89752f83 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java @@ -191,7 +191,7 @@ public class NativePrivilegeStore { .map(NativePrivilegeStore::nameFromDocId) .collect(TUPLES_TO_MAP); clearRolesCache(listener, createdNames); - }, listener::onFailure), privileges.size(), Collections.emptyList()); + }, listener::onFailure), privileges.size()); for (ApplicationPrivilegeDescriptor privilege : privileges) { innerPutPrivilege(privilege, refreshPolicy, groupListener); } @@ -232,7 +232,7 @@ public class NativePrivilegeStore { .map(NativePrivilegeStore::nameFromDocId) .collect(TUPLES_TO_MAP); clearRolesCache(listener, deletedNames); - }, listener::onFailure), names.size(), Collections.emptyList()); + }, listener::onFailure), names.size()); for (String name : names) { ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, client.prepareDelete(SECURITY_INDEX_NAME, SINGLE_MAPPING_NAME, toDocId(application, name))