Await all pending activity in testConnectAndDisconnect (#40037)

We call `ensureConnections()` to undo the effects of a disruption. However, it
is possible that one or more targets are currently CONNECTING and have been
since the disruption was active, and that the connection attempt was thwarted
by a concurrent disruption to the connection.  If so, we cannot simply add our
listener to the queue because it will be notified when this CONNECTING activity
completes even though it was disrupted. We must therefore wait for all the
current activity to finish and then go through and reconnect to any missing
nodes.

Closes #40030.
This commit is contained in:
David Turner 2019-03-15 08:04:08 +00:00
parent 8f09c77777
commit 0d152a54f8
2 changed files with 86 additions and 40 deletions

View File

@ -156,19 +156,45 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
runnables.forEach(Runnable::run);
}
/**
* Makes a single attempt to reconnect to any nodes which are disconnected but should be connected. Does not attempt to reconnect any
* nodes which are in the process of disconnecting. The onCompletion handler is called after all ongoing connection/disconnection
* attempts have completed.
*/
void ensureConnections(Runnable onCompletion) {
// Called by tests after some disruption has concluded. It is possible that one or more targets are currently CONNECTING and have
// been since the disruption was active, and that the connection attempt was thwarted by a concurrent disruption to the connection.
// If so, we cannot simply add our listener to the queue because it will be notified when this CONNECTING activity completes even
// though it was disrupted. We must therefore wait for all the current activity to finish and then go through and reconnect to
// any missing nodes.
awaitPendingActivity(() -> connectDisconnectedTargets(onCompletion));
}
private void awaitPendingActivity(Runnable onCompletion) {
final List<Runnable> runnables = new ArrayList<>();
synchronized (mutex) {
final Collection<ConnectionTarget> connectionTargets = targetsByNode.values();
if (connectionTargets.isEmpty()) {
runnables.add(onCompletion);
} else {
logger.trace("ensuring connections to {}", targetsByNode);
final GroupedActionListener<Void> listener = new GroupedActionListener<>(
ActionListener.wrap(onCompletion), connectionTargets.size());
for (final ConnectionTarget connectionTarget : connectionTargets) {
runnables.add(connectionTarget.awaitCurrentActivity(listener));
}
}
}
runnables.forEach(Runnable::run);
}
/**
* Makes a single attempt to reconnect to any nodes which are disconnected but should be connected. Does not attempt to reconnect any
* nodes which are in the process of disconnecting. The onCompletion handler is called after all ongoing connection/disconnection
* attempts have completed.
*/
private void connectDisconnectedTargets(Runnable onCompletion) {
final List<Runnable> runnables = new ArrayList<>();
synchronized (mutex) {
final Collection<ConnectionTarget> connectionTargets = targetsByNode.values();
if (connectionTargets.isEmpty()) {
runnables.add(onCompletion);
} else {
logger.trace("connectDisconnectedTargets: {}", targetsByNode);
final GroupedActionListener<Void> listener = new GroupedActionListener<>(
ActionListener.wrap(onCompletion), connectionTargets.size());
for (final ConnectionTarget connectionTarget : connectionTargets) {
@ -182,7 +208,7 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
class ConnectionChecker extends AbstractRunnable {
protected void doRun() {
if (connectionChecker == this) {
ensureConnections(this::scheduleNextCheck);
connectDisconnectedTargets(this::scheduleNextCheck);
}
}
@ -352,6 +378,18 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
}
}
Runnable awaitCurrentActivity(ActionListener<Void> listener) {
assert Thread.holdsLock(mutex) : "mutex not held";
if (activityType == ActivityType.IDLE) {
return () -> listener.onResponse(null);
} else {
addListener(listener);
return () -> {
};
}
}
private void addListener(@Nullable ActionListener<Void> listener) {
assert Thread.holdsLock(mutex) : "mutex not held";
assert activityType != ActivityType.IDLE;

View File

@ -95,7 +95,6 @@ public class NodeConnectionsServiceTests extends ESTestCase {
return builder.build();
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40030")
public void testConnectAndDisconnect() throws Exception {
final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
@ -106,19 +105,26 @@ public class NodeConnectionsServiceTests extends ESTestCase {
service.ensureConnections(() -> future.onResponse(null));
future.actionGet();
}
});
}, "reconnection thread");
reconnectionThread.start();
try {
final List<DiscoveryNode> allNodes = generateNodes();
for (int iteration = 0; iteration < 3; iteration++) {
final boolean isDisrupting = randomBoolean();
if (isDisrupting == false) {
// if the previous iteration was a disrupting one then there could still be some pending disconnections which would
// prevent us from asserting that all nodes are connected in this iteration without this call.
ensureConnections(service);
}
final AtomicBoolean stopDisrupting = new AtomicBoolean();
final Thread disruptionThread = new Thread(() -> {
while (isDisrupting && stopDisrupting.get() == false) {
transportService.disconnectFromNode(randomFrom(allNodes));
}
});
}, "disruption thread " + iteration);
disruptionThread.start();
final DiscoveryNodes nodes = discoveryNodesFromList(randomSubsetOf(allNodes));
@ -143,11 +149,14 @@ public class NodeConnectionsServiceTests extends ESTestCase {
}
}
}
} finally {
assertTrue(stopReconnecting.compareAndSet(false, true));
reconnectionThread.join();
}
ensureConnections(service);
}
public void testPeriodicReconnection() {
final Settings.Builder settings = Settings.builder();
final long reconnectIntervalMillis;
@ -206,7 +215,6 @@ public class NodeConnectionsServiceTests extends ESTestCase {
assertConnectedExactlyToNodes(targetNodes);
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40030")
public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);