From 321f93c4f95255536ae85cd92f9a15a79e48674e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 9 Apr 2019 14:24:54 -0400 Subject: [PATCH] Wait for all listeners in checkpoint listeners test It could be that we try to shutdown the executor pool before all the listeners have been invoked. It can happen that one was not invoked if it timed out and was in the process of being notified that it timed out on the executor. If we do this shutdown then, a listener will be met with rejected execution exception. To address this, we first wait until all listeners have been notified (or timed out) before proceeding with shutting down the executor. Relates #40970 --- .../shard/GlobalCheckpointListenersTests.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index 59c3553d25f..d71bade29a3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -431,7 +431,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { assertThat(count.get(), equalTo(numberOfListeners)); } - public void testConcurrency() throws BrokenBarrierException, InterruptedException { + public void testConcurrency() throws Exception { final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, 8)); final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger); final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); @@ -470,11 +470,12 @@ public class GlobalCheckpointListenersTests extends ESTestCase { // sometimes this will notify the listener immediately globalCheckpointListeners.add( globalCheckpoint.get(), - maybeMultipleInvocationProtectingListener((g, e) -> { - if (invocation.compareAndSet(false, true) == false) { - throw new IllegalStateException("listener invoked twice"); - } - }), + maybeMultipleInvocationProtectingListener( + (g, e) -> { + if (invocation.compareAndSet(false, true) == false) { + throw new IllegalStateException("listener invoked twice"); + } + }), randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1)))); } // synchronize ending with the updating thread and the main test thread @@ -491,11 +492,13 @@ public class GlobalCheckpointListenersTests extends ESTestCase { globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet()); } assertThat(globalCheckpointListeners.pendingListeners(), equalTo(0)); - executor.shutdown(); - executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + // wait for all the listeners to be notified for (final AtomicBoolean invocation : invocations) { - assertTrue(invocation.get()); + assertBusy(() -> assertTrue(invocation.get())); } + // now shutdown + executor.shutdown(); + assertTrue(executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)); updatingThread.join(); listenersThread.join(); }