diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index dcb24008a28..f48413824d3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -156,19 +156,45 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { runnables.forEach(Runnable::run); } - /** - * Makes a single attempt to reconnect to any nodes which are disconnected but should be connected. Does not attempt to reconnect any - * nodes which are in the process of disconnecting. The onCompletion handler is called after all ongoing connection/disconnection - * attempts have completed. - */ void ensureConnections(Runnable onCompletion) { + // Called by tests after some disruption has concluded. It is possible that one or more targets are currently CONNECTING and have + // been since the disruption was active, and that the connection attempt was thwarted by a concurrent disruption to the connection. + // If so, we cannot simply add our listener to the queue because it will be notified when this CONNECTING activity completes even + // though it was disrupted. We must therefore wait for all the current activity to finish and then go through and reconnect to + // any missing nodes. + awaitPendingActivity(() -> connectDisconnectedTargets(onCompletion)); + } + + private void awaitPendingActivity(Runnable onCompletion) { final List runnables = new ArrayList<>(); synchronized (mutex) { final Collection connectionTargets = targetsByNode.values(); if (connectionTargets.isEmpty()) { runnables.add(onCompletion); } else { - logger.trace("ensuring connections to {}", targetsByNode); + final GroupedActionListener listener = new GroupedActionListener<>( + ActionListener.wrap(onCompletion), connectionTargets.size()); + for (final ConnectionTarget connectionTarget : connectionTargets) { + runnables.add(connectionTarget.awaitCurrentActivity(listener)); + } + } + } + runnables.forEach(Runnable::run); + } + + /** + * Makes a single attempt to reconnect to any nodes which are disconnected but should be connected. Does not attempt to reconnect any + * nodes which are in the process of disconnecting. The onCompletion handler is called after all ongoing connection/disconnection + * attempts have completed. + */ + private void connectDisconnectedTargets(Runnable onCompletion) { + final List runnables = new ArrayList<>(); + synchronized (mutex) { + final Collection connectionTargets = targetsByNode.values(); + if (connectionTargets.isEmpty()) { + runnables.add(onCompletion); + } else { + logger.trace("connectDisconnectedTargets: {}", targetsByNode); final GroupedActionListener listener = new GroupedActionListener<>( ActionListener.wrap(onCompletion), connectionTargets.size()); for (final ConnectionTarget connectionTarget : connectionTargets) { @@ -182,7 +208,7 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { class ConnectionChecker extends AbstractRunnable { protected void doRun() { if (connectionChecker == this) { - ensureConnections(this::scheduleNextCheck); + connectDisconnectedTargets(this::scheduleNextCheck); } } @@ -352,6 +378,18 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { } } + Runnable awaitCurrentActivity(ActionListener listener) { + assert Thread.holdsLock(mutex) : "mutex not held"; + + if (activityType == ActivityType.IDLE) { + return () -> listener.onResponse(null); + } else { + addListener(listener); + return () -> { + }; + } + } + private void addListener(@Nullable ActionListener listener) { assert Thread.holdsLock(mutex) : "mutex not held"; assert activityType != ActivityType.IDLE; diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 02fdc4dc313..0a4ef759cb6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -95,7 +95,6 @@ public class NodeConnectionsServiceTests extends ESTestCase { return builder.build(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40030") public void testConnectAndDisconnect() throws Exception { final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); @@ -106,46 +105,56 @@ public class NodeConnectionsServiceTests extends ESTestCase { service.ensureConnections(() -> future.onResponse(null)); future.actionGet(); } - }); + }, "reconnection thread"); reconnectionThread.start(); - final List allNodes = generateNodes(); - for (int iteration = 0; iteration < 3; iteration++) { + try { - final boolean isDisrupting = randomBoolean(); - final AtomicBoolean stopDisrupting = new AtomicBoolean(); - final Thread disruptionThread = new Thread(() -> { - while (isDisrupting && stopDisrupting.get() == false) { - transportService.disconnectFromNode(randomFrom(allNodes)); - } - }); - disruptionThread.start(); + final List allNodes = generateNodes(); + for (int iteration = 0; iteration < 3; iteration++) { - final DiscoveryNodes nodes = discoveryNodesFromList(randomSubsetOf(allNodes)); - final PlainActionFuture future = new PlainActionFuture<>(); - service.connectToNodes(nodes, () -> future.onResponse(null)); - future.actionGet(); - if (isDisrupting == false) { - assertConnected(nodes); - } - service.disconnectFromNodesExcept(nodes); - - assertTrue(stopDisrupting.compareAndSet(false, true)); - disruptionThread.join(); - - if (randomBoolean()) { - // sometimes do not wait for the disconnections to complete before starting the next connections - if (usually()) { + final boolean isDisrupting = randomBoolean(); + if (isDisrupting == false) { + // if the previous iteration was a disrupting one then there could still be some pending disconnections which would + // prevent us from asserting that all nodes are connected in this iteration without this call. ensureConnections(service); - assertConnectedExactlyToNodes(nodes); - } else { - assertBusy(() -> assertConnectedExactlyToNodes(nodes)); + } + final AtomicBoolean stopDisrupting = new AtomicBoolean(); + final Thread disruptionThread = new Thread(() -> { + while (isDisrupting && stopDisrupting.get() == false) { + transportService.disconnectFromNode(randomFrom(allNodes)); + } + }, "disruption thread " + iteration); + disruptionThread.start(); + + final DiscoveryNodes nodes = discoveryNodesFromList(randomSubsetOf(allNodes)); + final PlainActionFuture future = new PlainActionFuture<>(); + service.connectToNodes(nodes, () -> future.onResponse(null)); + future.actionGet(); + if (isDisrupting == false) { + assertConnected(nodes); + } + service.disconnectFromNodesExcept(nodes); + + assertTrue(stopDisrupting.compareAndSet(false, true)); + disruptionThread.join(); + + if (randomBoolean()) { + // sometimes do not wait for the disconnections to complete before starting the next connections + if (usually()) { + ensureConnections(service); + assertConnectedExactlyToNodes(nodes); + } else { + assertBusy(() -> assertConnectedExactlyToNodes(nodes)); + } } } + } finally { + assertTrue(stopReconnecting.compareAndSet(false, true)); + reconnectionThread.join(); } - assertTrue(stopReconnecting.compareAndSet(false, true)); - reconnectionThread.join(); + ensureConnections(service); } public void testPeriodicReconnection() { @@ -206,7 +215,6 @@ public class NodeConnectionsServiceTests extends ESTestCase { assertConnectedExactlyToNodes(targetNodes); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40030") public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception { final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);