diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 76d94966dd2..ea45eb42d89 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -101,13 +101,12 @@ public class NodeJoinTests extends ESTestCase { masterService.close(); } - private static ClusterState initialState(boolean withMaster, DiscoveryNode localNode, long term, long version, + private static ClusterState initialState(DiscoveryNode localNode, long term, long version, VotingConfiguration config) { - ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) + return ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) .nodes(DiscoveryNodes.builder() .add(localNode) - .localNodeId(localNode.getId()) - .masterNodeId(withMaster ? localNode.getId() : null)) + .localNodeId(localNode.getId())) .metaData(MetaData.builder() .coordinationMetaData( CoordinationMetaData.builder() @@ -117,7 +116,6 @@ public class NodeJoinTests extends ESTestCase { .build())) .version(version) .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build(); - return initialClusterState; } private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) { @@ -267,7 +265,7 @@ public class NodeJoinTests extends ESTestCase { DiscoveryNode node1 = newNode(1, true); long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); - setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, new VotingConfiguration(Collections.singleton(randomFrom(node0, node1).getId())))); assertFalse(isLocalNodeElectedMaster()); assertNull(coordinator.getStateForMasterService().nodes().getMasterNodeId()); @@ -286,7 +284,7 @@ public class NodeJoinTests extends ESTestCase { DiscoveryNode node1 = newNode(1, true); long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); - setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, new VotingConfiguration(Collections.singleton(node1.getId())))); assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); @@ -301,7 +299,7 @@ public class NodeJoinTests extends ESTestCase { DiscoveryNode node1 = newNode(1, true); long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); - setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, new VotingConfiguration(Collections.singleton(node0.getId())))); assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); @@ -315,7 +313,7 @@ public class NodeJoinTests extends ESTestCase { DiscoveryNode node1 = newNode(1, true); long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); - setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, new VotingConfiguration(Collections.singleton(node0.getId())))); assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); @@ -333,7 +331,7 @@ public class NodeJoinTests extends ESTestCase { DiscoveryNode node2 = newNode(2, true); long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); - setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, new VotingConfiguration(Collections.singleton(node2.getId())))); assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); @@ -360,7 +358,7 @@ public class NodeJoinTests extends ESTestCase { DiscoveryNode node1 = newNode(1, true); long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); - setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, new VotingConfiguration(Collections.singleton(node0.getId())))); long newTerm = initialTerm + randomLongBetween(1, 10); coordinator.coordinationState.get().handleStartJoin(new StartJoinRequest(node1, newTerm)); @@ -379,7 +377,7 @@ public class NodeJoinTests extends ESTestCase { DiscoveryNode node1 = newNode(1, true); long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); - setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, new VotingConfiguration(Collections.singleton(node0.getId())))); long newTerm = initialTerm + randomLongBetween(1, 10); coordinator.coordinationState.get().handleStartJoin(new StartJoinRequest(node1, newTerm)); @@ -398,7 +396,7 @@ public class NodeJoinTests extends ESTestCase { DiscoveryNode node1 = newNode(1, true); long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); - setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, new VotingConfiguration(Collections.singleton(node1.getId())))); long newTerm = initialTerm + randomLongBetween(1, 10); SimpleFuture fut = joinNodeAsync(new JoinRequest(node0, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); @@ -427,7 +425,7 @@ public class NodeJoinTests extends ESTestCase { long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); - setupRealMasterServiceAndCoordinator(initialTerm, initialState(false, localNode, initialTerm, initialVersion, votingConfiguration)); + setupRealMasterServiceAndCoordinator(initialTerm, initialState(localNode, initialTerm, initialVersion, votingConfiguration)); long newTerm = initialTerm + randomLongBetween(1, 10); // we need at least a quorum of voting nodes with a correct term and worse state @@ -468,33 +466,35 @@ public class NodeJoinTests extends ESTestCase { possiblyFailingJoinRequests.addAll(randomSubsetOf(possiblyFailingJoinRequests)); CyclicBarrier barrier = new CyclicBarrier(correctJoinRequests.size() + possiblyFailingJoinRequests.size() + 1); - - final AtomicBoolean stopAsserting = new AtomicBoolean(); - final Thread assertionThread = new Thread(() -> { + final Runnable awaitBarrier = () -> { try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } + }; + + final AtomicBoolean stopAsserting = new AtomicBoolean(); + final Thread assertionThread = new Thread(() -> { + awaitBarrier.run(); while (stopAsserting.get() == false) { coordinator.invariant(); } }, "assert invariants"); - final List joinThreads = Stream.concat(correctJoinRequests.stream(), possiblyFailingJoinRequests.stream()) - .map(joinRequest -> - new Thread(() -> { - try { - barrier.await(); - } catch (InterruptedException | BrokenBarrierException e) { - throw new RuntimeException(e); - } - try { - joinNode(joinRequest); - } catch (CoordinationStateRejectedException e) { - // ignore - } - }, "process " + joinRequest)).collect(Collectors.toList()); + final List joinThreads = Stream.concat(correctJoinRequests.stream().map(joinRequest -> + new Thread(() -> { + awaitBarrier.run(); + joinNode(joinRequest); + }, "process " + joinRequest)), possiblyFailingJoinRequests.stream().map(joinRequest -> + new Thread(() -> { + awaitBarrier.run(); + try { + joinNode(joinRequest); + } catch (CoordinationStateRejectedException e) { + // ignore - these requests are expected to fail + } + }, "process " + joinRequest))).collect(Collectors.toList()); assertionThread.start(); joinThreads.forEach(Thread::start);