Tidy up GroupedActionListener (#39633)

Today the `GroupedActionListener` accepts a `defaults` parameter but all
callers pass an empty list. Also it is permitted to pass an empty group but
this is trappy because the delegated listener is never be called in that case.
This commit removes the `defaults` parameter and forbids an empty group.
This commit is contained in:
David Turner 2019-03-06 09:23:12 +00:00
parent c91dcbd5ee
commit 77dd711847
9 changed files with 23 additions and 36 deletions

View File

@ -29,8 +29,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* An action listener that delegates it's results to another listener once
* it has received one or more failures or N results. This allows synchronous
* An action listener that delegates its results to another listener once
* it has received N results (either successes or failures). This allows synchronous
* tasks to be forked off in a loop with the same listener and respond to a
* higher level listener once all tasks responded.
*/
@ -39,7 +39,6 @@ public final class GroupedActionListener<T> implements ActionListener<T> {
private final AtomicInteger pos = new AtomicInteger();
private final AtomicArray<T> results;
private final ActionListener<Collection<T>> delegate;
private final Collection<T> defaults;
private final AtomicReference<Exception> failure = new AtomicReference<>();
/**
@ -47,12 +46,13 @@ public final class GroupedActionListener<T> implements ActionListener<T> {
* @param delegate the delegate listener
* @param groupSize the group size
*/
public GroupedActionListener(ActionListener<Collection<T>> delegate, int groupSize,
Collection<T> defaults) {
public GroupedActionListener(ActionListener<Collection<T>> delegate, int groupSize) {
if (groupSize <= 0) {
throw new IllegalArgumentException("groupSize must be greater than 0 but was " + groupSize);
}
results = new AtomicArray<>(groupSize);
countDown = new CountDown(groupSize);
this.delegate = delegate;
this.defaults = defaults;
}
@Override
@ -63,7 +63,6 @@ public final class GroupedActionListener<T> implements ActionListener<T> {
delegate.onFailure(failure.get());
} else {
List<T> collect = this.results.asList();
collect.addAll(defaults);
delegate.onResponse(Collections.unmodifiableList(collect));
}
}

View File

@ -51,10 +51,7 @@ public class GroupedActionListenerTests extends ESTestCase {
};
final int groupSize = randomIntBetween(10, 1000);
AtomicInteger count = new AtomicInteger();
Collection<Integer> defaults = randomBoolean() ? Collections.singletonList(-1) :
Collections.emptyList();
GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, groupSize,
defaults);
GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, groupSize);
int numThreads = randomIntBetween(2, 5);
Thread[] threads = new Thread[numThreads];
CyclicBarrier barrier = new CyclicBarrier(numThreads);
@ -65,7 +62,7 @@ public class GroupedActionListenerTests extends ESTestCase {
} catch (Exception e) {
throw new AssertionError(e);
}
int c = 0;
int c;
while((c = count.incrementAndGet()) <= groupSize) {
listener.onResponse(c-1);
}
@ -78,10 +75,9 @@ public class GroupedActionListenerTests extends ESTestCase {
assertNotNull(resRef.get());
ArrayList<Integer> list = new ArrayList<>(resRef.get());
Collections.sort(list);
int expectedSize = groupSize + defaults.size();
assertEquals(expectedSize, resRef.get().size());
int expectedValue = defaults.isEmpty() ? 0 : -1;
for (int i = 0; i < expectedSize; i++) {
assertEquals(groupSize, resRef.get().size());
int expectedValue = 0;
for (int i = 0; i < groupSize; i++) {
assertEquals(Integer.valueOf(expectedValue++), list.get(i));
}
}
@ -101,9 +97,8 @@ public class GroupedActionListenerTests extends ESTestCase {
excRef.set(e);
}
};
Collection<Integer> defaults = randomBoolean() ? Collections.singletonList(-1) : Collections.emptyList();
int size = randomIntBetween(3, 4);
GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, size, defaults);
GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, size);
listener.onResponse(0);
IOException ioException = new IOException();
RuntimeException rtException = new RuntimeException();
@ -124,8 +119,7 @@ public class GroupedActionListenerTests extends ESTestCase {
public void testConcurrentFailures() throws InterruptedException {
AtomicReference<Exception> finalException = new AtomicReference<>();
int numGroups = randomIntBetween(10, 100);
GroupedActionListener<Void> listener = new GroupedActionListener<>(
ActionListener.wrap(r -> {}, finalException::set), numGroups, Collections.emptyList());
GroupedActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(r -> {}, finalException::set), numGroups);
ExecutorService executorService = Executors.newFixedThreadPool(numGroups);
for (int i = 0; i < numGroups; i++) {
executorService.submit(() -> listener.onFailure(new IOException()));

View File

@ -433,7 +433,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
ActionListener.wrap(
rs -> resultHandler.accept(new AutoFollowResult(autoFollowPattenName, new ArrayList<>(rs))),
e -> { throw new AssertionError("must never happen", e); }),
leaderIndicesToFollow.size(), Collections.emptyList());
leaderIndicesToFollow.size());
for (final Index indexToFollow : leaderIndicesToFollow) {
List<String> otherMatchingPatterns = patternsForTheSameRemoteCluster.stream()

View File

@ -41,7 +41,6 @@ import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@ -137,8 +136,8 @@ public class TransportUnfollowAction extends TransportMasterNodeAction<UnfollowA
}
},
numberOfShards,
Collections.emptyList());
numberOfShards
);
for (int i = 0; i < numberOfShards; i++) {
final ShardId followerShardId = new ShardId(indexMetaData.getIndex(), i);
final ShardId leaderShardId = new ShardId(leaderIndex, i);

View File

@ -28,7 +28,6 @@ import org.elasticsearch.xpack.security.authc.saml.SamlRedirect;
import org.elasticsearch.xpack.security.authc.saml.SamlUtils;
import org.opensaml.saml.saml2.core.LogoutResponse;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -98,9 +97,7 @@ public final class TransportSamlInvalidateSessionAction
listener.onResponse(0);
} else {
GroupedActionListener<TokensInvalidationResult> groupedListener = new GroupedActionListener<>(
ActionListener.wrap(collection -> listener.onResponse(collection.size()), listener::onFailure),
tokens.size(), Collections.emptyList()
);
ActionListener.wrap(collection -> listener.onResponse(collection.size()), listener::onFailure), tokens.size());
tokens.forEach(tuple -> invalidateTokenPair(tuple, groupedListener));
}
}, listener::onFailure

View File

@ -70,7 +70,7 @@ public class TransportGetUsersAction extends HandledTransportAction<GetUsersRequ
listener.onResponse(new GetUsersResponse(users));
}, listener::onFailure);
final GroupedActionListener<Collection<User>> groupListener =
new GroupedActionListener<>(sendingListener, 2, Collections.emptyList());
new GroupedActionListener<>(sendingListener, 2);
// We have two sources for the users object, the reservedRealm and the usersStore, we query both at the same time with a
// GroupedActionListener
if (realmLookup.isEmpty()) {
@ -84,8 +84,7 @@ public class TransportGetUsersAction extends HandledTransportAction<GetUsersRequ
} else {
// nested group listener action here - for each of the users we got and fetch it concurrently - once we are done we notify
// the "global" group listener.
GroupedActionListener<User> realmGroupListener = new GroupedActionListener<>(groupListener, realmLookup.size(),
Collections.emptyList());
GroupedActionListener<User> realmGroupListener = new GroupedActionListener<>(groupListener, realmLookup.size());
for (String user : realmLookup) {
reservedRealm.lookupUser(user, realmGroupListener);
}

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.security.authc.support.mapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@ -43,7 +42,7 @@ public class CompositeRoleMapper implements UserRoleMapper {
public void resolveRoles(UserData user, ActionListener<Set<String>> listener) {
GroupedActionListener<Set<String>> groupListener = new GroupedActionListener<>(ActionListener.wrap(
composite -> listener.onResponse(composite.stream().flatMap(Set::stream).collect(Collectors.toSet())), listener::onFailure
), delegates.size(), Collections.emptyList());
), delegates.size());
this.delegates.forEach(mapper -> mapper.resolveRoles(user, groupListener));
}

View File

@ -506,7 +506,7 @@ public class AuthorizationService {
listener.onResponse(null);
}, listener::onFailure);
final ActionListener<Tuple<String, IndexAuthorizationResult>> groupedActionListener = wrapPreservingContext(
new GroupedActionListener<>(bulkAuthzListener, actionToIndicesMap.size(), Collections.emptyList()), threadContext);
new GroupedActionListener<>(bulkAuthzListener, actionToIndicesMap.size()), threadContext);
actionToIndicesMap.forEach((bulkItemAction, indices) -> {
final RequestInfo bulkItemInfo =

View File

@ -191,7 +191,7 @@ public class NativePrivilegeStore {
.map(NativePrivilegeStore::nameFromDocId)
.collect(TUPLES_TO_MAP);
clearRolesCache(listener, createdNames);
}, listener::onFailure), privileges.size(), Collections.emptyList());
}, listener::onFailure), privileges.size());
for (ApplicationPrivilegeDescriptor privilege : privileges) {
innerPutPrivilege(privilege, refreshPolicy, groupListener);
}
@ -232,7 +232,7 @@ public class NativePrivilegeStore {
.map(NativePrivilegeStore::nameFromDocId)
.collect(TUPLES_TO_MAP);
clearRolesCache(listener, deletedNames);
}, listener::onFailure), names.size(), Collections.emptyList());
}, listener::onFailure), names.size());
for (String name : names) {
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareDelete(SECURITY_INDEX_NAME, SINGLE_MAPPING_NAME, toDocId(application, name))