From 0d152a54f8b9907e11120ba88356d003f980de5f Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 15 Mar 2019 08:04:08 +0000 Subject: [PATCH] Await all pending activity in testConnectAndDisconnect (#40037) We call `ensureConnections()` to undo the effects of a disruption. However, it is possible that one or more targets are currently CONNECTING and have been since the disruption was active, and that the connection attempt was thwarted by a concurrent disruption to the connection. If so, we cannot simply add our listener to the queue because it will be notified when this CONNECTING activity completes even though it was disrupted. We must therefore wait for all the current activity to finish and then go through and reconnect to any missing nodes. Closes #40030. --- .../cluster/NodeConnectionsService.java | 52 +++++++++++-- .../cluster/NodeConnectionsServiceTests.java | 74 ++++++++++--------- 2 files changed, 86 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index dcb24008a28..f48413824d3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -156,19 +156,45 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { runnables.forEach(Runnable::run); } - /** - * Makes a single attempt to reconnect to any nodes which are disconnected but should be connected. Does not attempt to reconnect any - * nodes which are in the process of disconnecting. The onCompletion handler is called after all ongoing connection/disconnection - * attempts have completed. - */ void ensureConnections(Runnable onCompletion) { + // Called by tests after some disruption has concluded. It is possible that one or more targets are currently CONNECTING and have + // been since the disruption was active, and that the connection attempt was thwarted by a concurrent disruption to the connection. + // If so, we cannot simply add our listener to the queue because it will be notified when this CONNECTING activity completes even + // though it was disrupted. We must therefore wait for all the current activity to finish and then go through and reconnect to + // any missing nodes. + awaitPendingActivity(() -> connectDisconnectedTargets(onCompletion)); + } + + private void awaitPendingActivity(Runnable onCompletion) { final List runnables = new ArrayList<>(); synchronized (mutex) { final Collection connectionTargets = targetsByNode.values(); if (connectionTargets.isEmpty()) { runnables.add(onCompletion); } else { - logger.trace("ensuring connections to {}", targetsByNode); + final GroupedActionListener listener = new GroupedActionListener<>( + ActionListener.wrap(onCompletion), connectionTargets.size()); + for (final ConnectionTarget connectionTarget : connectionTargets) { + runnables.add(connectionTarget.awaitCurrentActivity(listener)); + } + } + } + runnables.forEach(Runnable::run); + } + + /** + * Makes a single attempt to reconnect to any nodes which are disconnected but should be connected. Does not attempt to reconnect any + * nodes which are in the process of disconnecting. The onCompletion handler is called after all ongoing connection/disconnection + * attempts have completed. + */ + private void connectDisconnectedTargets(Runnable onCompletion) { + final List runnables = new ArrayList<>(); + synchronized (mutex) { + final Collection connectionTargets = targetsByNode.values(); + if (connectionTargets.isEmpty()) { + runnables.add(onCompletion); + } else { + logger.trace("connectDisconnectedTargets: {}", targetsByNode); final GroupedActionListener listener = new GroupedActionListener<>( ActionListener.wrap(onCompletion), connectionTargets.size()); for (final ConnectionTarget connectionTarget : connectionTargets) { @@ -182,7 +208,7 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { class ConnectionChecker extends AbstractRunnable { protected void doRun() { if (connectionChecker == this) { - ensureConnections(this::scheduleNextCheck); + connectDisconnectedTargets(this::scheduleNextCheck); } } @@ -352,6 +378,18 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { } } + Runnable awaitCurrentActivity(ActionListener listener) { + assert Thread.holdsLock(mutex) : "mutex not held"; + + if (activityType == ActivityType.IDLE) { + return () -> listener.onResponse(null); + } else { + addListener(listener); + return () -> { + }; + } + } + private void addListener(@Nullable ActionListener listener) { assert Thread.holdsLock(mutex) : "mutex not held"; assert activityType != ActivityType.IDLE; diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 02fdc4dc313..0a4ef759cb6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -95,7 +95,6 @@ public class NodeConnectionsServiceTests extends ESTestCase { return builder.build(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40030") public void testConnectAndDisconnect() throws Exception { final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); @@ -106,46 +105,56 @@ public class NodeConnectionsServiceTests extends ESTestCase { service.ensureConnections(() -> future.onResponse(null)); future.actionGet(); } - }); + }, "reconnection thread"); reconnectionThread.start(); - final List allNodes = generateNodes(); - for (int iteration = 0; iteration < 3; iteration++) { + try { - final boolean isDisrupting = randomBoolean(); - final AtomicBoolean stopDisrupting = new AtomicBoolean(); - final Thread disruptionThread = new Thread(() -> { - while (isDisrupting && stopDisrupting.get() == false) { - transportService.disconnectFromNode(randomFrom(allNodes)); - } - }); - disruptionThread.start(); + final List allNodes = generateNodes(); + for (int iteration = 0; iteration < 3; iteration++) { - final DiscoveryNodes nodes = discoveryNodesFromList(randomSubsetOf(allNodes)); - final PlainActionFuture future = new PlainActionFuture<>(); - service.connectToNodes(nodes, () -> future.onResponse(null)); - future.actionGet(); - if (isDisrupting == false) { - assertConnected(nodes); - } - service.disconnectFromNodesExcept(nodes); - - assertTrue(stopDisrupting.compareAndSet(false, true)); - disruptionThread.join(); - - if (randomBoolean()) { - // sometimes do not wait for the disconnections to complete before starting the next connections - if (usually()) { + final boolean isDisrupting = randomBoolean(); + if (isDisrupting == false) { + // if the previous iteration was a disrupting one then there could still be some pending disconnections which would + // prevent us from asserting that all nodes are connected in this iteration without this call. ensureConnections(service); - assertConnectedExactlyToNodes(nodes); - } else { - assertBusy(() -> assertConnectedExactlyToNodes(nodes)); + } + final AtomicBoolean stopDisrupting = new AtomicBoolean(); + final Thread disruptionThread = new Thread(() -> { + while (isDisrupting && stopDisrupting.get() == false) { + transportService.disconnectFromNode(randomFrom(allNodes)); + } + }, "disruption thread " + iteration); + disruptionThread.start(); + + final DiscoveryNodes nodes = discoveryNodesFromList(randomSubsetOf(allNodes)); + final PlainActionFuture future = new PlainActionFuture<>(); + service.connectToNodes(nodes, () -> future.onResponse(null)); + future.actionGet(); + if (isDisrupting == false) { + assertConnected(nodes); + } + service.disconnectFromNodesExcept(nodes); + + assertTrue(stopDisrupting.compareAndSet(false, true)); + disruptionThread.join(); + + if (randomBoolean()) { + // sometimes do not wait for the disconnections to complete before starting the next connections + if (usually()) { + ensureConnections(service); + assertConnectedExactlyToNodes(nodes); + } else { + assertBusy(() -> assertConnectedExactlyToNodes(nodes)); + } } } + } finally { + assertTrue(stopReconnecting.compareAndSet(false, true)); + reconnectionThread.join(); } - assertTrue(stopReconnecting.compareAndSet(false, true)); - reconnectionThread.join(); + ensureConnections(service); } public void testPeriodicReconnection() { @@ -206,7 +215,6 @@ public class NodeConnectionsServiceTests extends ESTestCase { assertConnectedExactlyToNodes(targetNodes); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40030") public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception { final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);