Fix test concurrent remote connection updates
This test has a race condition. The action listener used to listen for connections has a guard against being executed twice. However, this listener can be executed twice. After on success is invoked the test starts to tear down. At this point, the threads the test forked will terminate and the remote cluster connection will be closed. However, a thread forked to the management thread pool by the remote cluster connection can still be executing and try to continue connecting. This thread will be cancelled when the remote cluster connection is closed and this leads to the action listener being invoked again. To address this, we explicitly check that the reason that on failure was invoked was cancellation, and we assert that the listener was already previously invoked. Interestingly, this issue has always been present yet a recent change (#28667) exposed errors that occur on tasks submitted to the thread pool and were silently being lost. Relates #28695
This commit is contained in:
parent
1fa701c18d
commit
57a56d8e64
|
@ -557,7 +557,6 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/28695")
|
||||
public void testTriggerUpdatesConcurrently() throws IOException, InterruptedException {
|
||||
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
|
||||
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
|
||||
|
@ -591,17 +590,28 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
CountDownLatch latch = new CountDownLatch(numConnectionAttempts);
|
||||
for (int i = 0; i < numConnectionAttempts; i++) {
|
||||
AtomicBoolean executed = new AtomicBoolean(false);
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> {
|
||||
assertTrue(executed.compareAndSet(false, true));
|
||||
latch.countDown();}, x -> {
|
||||
assertTrue(executed.compareAndSet(false, true));
|
||||
latch.countDown();
|
||||
if (x instanceof RejectedExecutionException) {
|
||||
// that's fine
|
||||
} else {
|
||||
throw new AssertionError(x);
|
||||
}
|
||||
});
|
||||
ActionListener<Void> listener = ActionListener.wrap(
|
||||
x -> {
|
||||
assertTrue(executed.compareAndSet(false, true));
|
||||
latch.countDown();},
|
||||
x -> {
|
||||
/*
|
||||
* This can occur on a thread submitted to the thread pool while we are closing the
|
||||
* remote cluster connection at the end of the test.
|
||||
*/
|
||||
if (x instanceof CancellableThreads.ExecutionCancelledException) {
|
||||
// we should already be shutting down
|
||||
assertTrue(executed.get());
|
||||
return;
|
||||
}
|
||||
|
||||
assertTrue(executed.compareAndSet(false, true));
|
||||
latch.countDown();
|
||||
|
||||
if (!(x instanceof RejectedExecutionException)) {
|
||||
throw new AssertionError(x);
|
||||
}
|
||||
});
|
||||
connection.updateSeedNodes(seedNodes, listener);
|
||||
}
|
||||
latch.await();
|
||||
|
|
Loading…
Reference in New Issue