Simplify AutoFollowCoordinator with GroupedListener (#39603)

This change simplifies AutoFollowCoordinator by replacing a combination
of AtomicArray and CountDown with GroupedActionListener.
This commit is contained in:
Nhat Nguyen 2019-03-04 12:47:43 -05:00
parent 7b8ff2d7c5
commit af4918ebff
1 changed files with 16 additions and 30 deletions

View File

@ -14,6 +14,7 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
@ -428,23 +429,21 @@ public class AutoFollowCoordinator implements ClusterStateListener {
MetaData remoteMetadata,
MetaData localMetadata,
Consumer<AutoFollowResult> resultHandler) {
final GroupedActionListener<Tuple<Index, Exception>> groupedListener = new GroupedActionListener<>(
ActionListener.wrap(
rs -> resultHandler.accept(new AutoFollowResult(autoFollowPattenName, new ArrayList<>(rs))),
e -> { throw new AssertionError("must never happen", e); }),
leaderIndicesToFollow.size(), Collections.emptyList());
final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
final AtomicArray<Tuple<Index, Exception>> results = new AtomicArray<>(leaderIndicesToFollow.size());
for (int i = 0; i < leaderIndicesToFollow.size(); i++) {
final Index indexToFollow = leaderIndicesToFollow.get(i);
final int slot = i;
for (final Index indexToFollow : leaderIndicesToFollow) {
List<String> otherMatchingPatterns = patternsForTheSameRemoteCluster.stream()
.filter(otherPattern -> otherPattern.v2().match(indexToFollow.getName()))
.map(Tuple::v1)
.collect(Collectors.toList());
if (otherMatchingPatterns.size() != 0) {
results.set(slot, new Tuple<>(indexToFollow, new ElasticsearchException("index to follow [" + indexToFollow.getName() +
"] for pattern [" + autoFollowPattenName + "] matches with other patterns " + otherMatchingPatterns + "")));
if (leaderIndicesCountDown.countDown()) {
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
}
groupedListener.onResponse(
new Tuple<>(indexToFollow, new ElasticsearchException("index to follow [" + indexToFollow.getName() +
"] for pattern [" + autoFollowPattenName + "] matches with other patterns " + otherMatchingPatterns + "")));
} else {
final Settings leaderIndexSettings = remoteMetadata.getIndexSafe(indexToFollow).getSettings();
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(leaderIndexSettings) == false) {
@ -456,28 +455,15 @@ public class AutoFollowCoordinator implements ClusterStateListener {
if (error != null) {
failure.addSuppressed(error);
}
results.set(slot, new Tuple<>(indexToFollow, failure));
if (leaderIndicesCountDown.countDown()) {
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
}
groupedListener.onResponse(new Tuple<>(indexToFollow, failure));
});
continue;
} else if (leaderIndexAlreadyFollowed(autoFollowPattern, indexToFollow, localMetadata)) {
updateAutoFollowMetadata(recordLeaderIndexAsFollowFunction(autoFollowPattenName, indexToFollow), error -> {
results.set(slot, new Tuple<>(indexToFollow, error));
if (leaderIndicesCountDown.countDown()) {
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
}
});
continue;
updateAutoFollowMetadata(recordLeaderIndexAsFollowFunction(autoFollowPattenName, indexToFollow),
error -> groupedListener.onResponse(new Tuple<>(indexToFollow, error)));
} else {
followLeaderIndex(autoFollowPattenName, remoteCluster, indexToFollow, autoFollowPattern, headers,
error -> groupedListener.onResponse(new Tuple<>(indexToFollow, error)));
}
followLeaderIndex(autoFollowPattenName, remoteCluster, indexToFollow, autoFollowPattern, headers, error -> {
results.set(slot, new Tuple<>(indexToFollow, error));
if (leaderIndicesCountDown.countDown()) {
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
}
});
}
}
}