Synchronize pendingOutgoingJoins (#39900)

Today we use a ConcurrentHashSet to track the in-flight outgoing joins in the
`JoinHelper`. This is fine for adding and removing elements but not for the
emptiness test in `isJoinPending()` which might return false if one join
finishes just after another one starts, even though joins were pending
throughout.

As used today this is ok: it means the node was trying to join a master but
this join attempt just finished unsuccessfully, and causes it to (rightfully)
reject a `FollowerCheck` from the failed master. However this kind of API
inconsistency is trappy and there is no need to be clever here, so this change
replaces the set with a `synchronizedSet()`.
This commit is contained in:
David Turner 2019-03-11 12:00:56 +00:00
parent a079b9fd6d
commit 6e4f304f88

View File

@ -36,7 +36,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.zen.MembershipAction; import org.elasticsearch.discovery.zen.MembershipAction;
import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -53,7 +52,9 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -82,7 +83,7 @@ public class JoinHelper {
private final JoinTaskExecutor joinTaskExecutor; private final JoinTaskExecutor joinTaskExecutor;
private final TimeValue joinTimeout; private final TimeValue joinTimeout;
final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet(); private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier, TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
@ -190,8 +191,7 @@ public class JoinHelper {
} }
boolean isJoinPending() { boolean isJoinPending() {
// cannot use pendingOutgoingJoins.isEmpty() because it's not properly synchronized. return pendingOutgoingJoins.isEmpty() == false;
return pendingOutgoingJoins.iterator().hasNext();
} }
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) { public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {