From bf15decf2056c2b460ee893597fd550bdc96fa43 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 12 Jan 2017 00:21:46 +0100 Subject: [PATCH] flush pending listeners if remote cluster connection is closed --- .../search/RemoteClusterConnection.java | 1 + .../search/RemoteClusterConnectionTests.java | 32 ++++++++++++------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java index 0dda9ed2ef6..79381cc87e4 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java @@ -410,6 +410,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo cancellableThreads.cancel("connect handler is closed"); running.acquire(); // acquire the semaphore to ensure all connections are closed and all thread joined running.release(); + maybeConnect(); // now go an notify pending listeners } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java index e681a457b01..1e13a6c2585 100644 --- a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java @@ -464,17 +464,27 @@ public class RemoteClusterConnectionTests extends ESTestCase { barrier.await(); CountDownLatch latch = new CountDownLatch(numConnectionAttempts); for (int i = 0; i < numConnectionAttempts; i++) { - AtomicBoolean executed = new AtomicBoolean(false); - ActionListener listener = ActionListener.wrap(x -> { - assertTrue(executed.compareAndSet(false, true)); - latch.countDown();}, x -> { - assertTrue(executed.compareAndSet(false, true)); - latch.countDown(); - if (x instanceof RejectedExecutionException || x instanceof AlreadyClosedException) { - // that's fine - } else { - throw new AssertionError(x); - } + AtomicReference executed = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap( + x -> { + if (executed.compareAndSet(null, new RuntimeException())) { + latch.countDown(); + } else { + throw new AssertionError("shit's been called twice", executed.get()); + } + }, + x -> { + if (executed.compareAndSet(null, new RuntimeException())) { + latch.countDown(); + } else { + throw new AssertionError("shit's been called twice", executed.get()); + } + if (x instanceof RejectedExecutionException || x instanceof AlreadyClosedException + || x instanceof CancellableThreads.ExecutionCancelledException) { + // that's fine + } else { + throw new AssertionError(x); + } }); connection.updateSeedNodes(seedNodes, listener); }