flush pending listeners if remote cluster connection is closed

This commit is contained in:
Simon Willnauer 2017-01-12 00:21:46 +01:00
parent 00781d24ce
commit bf15decf20
2 changed files with 22 additions and 11 deletions

View File

@ -410,6 +410,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
cancellableThreads.cancel("connect handler is closed");
running.acquire(); // acquire the semaphore to ensure all connections are closed and all thread joined
running.release();
maybeConnect(); // now go an notify pending listeners
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();

View File

@ -464,13 +464,23 @@ public class RemoteClusterConnectionTests extends ESTestCase {
barrier.await();
CountDownLatch latch = new CountDownLatch(numConnectionAttempts);
for (int i = 0; i < numConnectionAttempts; i++) {
AtomicBoolean executed = new AtomicBoolean(false);
ActionListener<Void> listener = ActionListener.wrap(x -> {
assertTrue(executed.compareAndSet(false, true));
latch.countDown();}, x -> {
assertTrue(executed.compareAndSet(false, true));
AtomicReference<RuntimeException> executed = new AtomicReference<>();
ActionListener<Void> listener = ActionListener.wrap(
x -> {
if (executed.compareAndSet(null, new RuntimeException())) {
latch.countDown();
if (x instanceof RejectedExecutionException || x instanceof AlreadyClosedException) {
} else {
throw new AssertionError("shit's been called twice", executed.get());
}
},
x -> {
if (executed.compareAndSet(null, new RuntimeException())) {
latch.countDown();
} else {
throw new AssertionError("shit's been called twice", executed.get());
}
if (x instanceof RejectedExecutionException || x instanceof AlreadyClosedException
|| x instanceof CancellableThreads.ExecutionCancelledException) {
// that's fine
} else {
throw new AssertionError(x);