From 6e4f304f889354edf04d5e3feb26a86694d4931a Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 11 Mar 2019 12:00:56 +0000 Subject: [PATCH] Synchronize pendingOutgoingJoins (#39900) Today we use a ConcurrentHashSet to track the in-flight outgoing joins in the `JoinHelper`. This is fine for adding and removing elements but not for the emptiness test in `isJoinPending()` which might return false if one join finishes just after another one starts, even though joins were pending throughout. As used today this is ok: it means the node was trying to join a master but this join attempt just finished unsuccessfully, and causes it to (rightfully) reject a `FollowerCheck` from the failed master. However this kind of API inconsistency is trappy and there is no need to be clever here, so this change replaces the set with a `synchronizedSet()`. --- .../elasticsearch/cluster/coordination/JoinHelper.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 9705a444c03..db55ba03eb0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -36,7 +36,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.zen.MembershipAction; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.threadpool.ThreadPool; @@ -53,7 +52,9 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -82,7 +83,7 @@ public class JoinHelper { private final JoinTaskExecutor joinTaskExecutor; private final TimeValue joinTimeout; - final Set> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet(); + private final Set> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>()); public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, @@ -190,8 +191,7 @@ public class JoinHelper { } boolean isJoinPending() { - // cannot use pendingOutgoingJoins.isEmpty() because it's not properly synchronized. - return pendingOutgoingJoins.iterator().hasNext(); + return pendingOutgoingJoins.isEmpty() == false; } public void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) {