Wait for all listeners in checkpoint listeners test

It could be that we try to shutdown the executor pool before all the
listeners have been invoked. It can happen that one was not invoked if
it timed out and was in the process of being notified that it timed out
on the executor. If we do this shutdown then, a listener will be met
with rejected execution exception. To address this, we first wait until
all listeners have been notified (or timed out) before proceeding with
shutting down the executor.

Relates #40970
This commit is contained in:
Jason Tedor 2019-04-09 14:24:54 -04:00
parent ebba9393c1
commit 321f93c4f9
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
1 changed files with 12 additions and 9 deletions

View File

@ -431,7 +431,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
assertThat(count.get(), equalTo(numberOfListeners)); assertThat(count.get(), equalTo(numberOfListeners));
} }
public void testConcurrency() throws BrokenBarrierException, InterruptedException { public void testConcurrency() throws Exception {
final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, 8)); final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, 8));
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger); final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
@ -470,11 +470,12 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
// sometimes this will notify the listener immediately // sometimes this will notify the listener immediately
globalCheckpointListeners.add( globalCheckpointListeners.add(
globalCheckpoint.get(), globalCheckpoint.get(),
maybeMultipleInvocationProtectingListener((g, e) -> { maybeMultipleInvocationProtectingListener(
if (invocation.compareAndSet(false, true) == false) { (g, e) -> {
throw new IllegalStateException("listener invoked twice"); if (invocation.compareAndSet(false, true) == false) {
} throw new IllegalStateException("listener invoked twice");
}), }
}),
randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1)))); randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1))));
} }
// synchronize ending with the updating thread and the main test thread // synchronize ending with the updating thread and the main test thread
@ -491,11 +492,13 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet()); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet());
} }
assertThat(globalCheckpointListeners.pendingListeners(), equalTo(0)); assertThat(globalCheckpointListeners.pendingListeners(), equalTo(0));
executor.shutdown(); // wait for all the listeners to be notified
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
for (final AtomicBoolean invocation : invocations) { for (final AtomicBoolean invocation : invocations) {
assertTrue(invocation.get()); assertBusy(() -> assertTrue(invocation.get()));
} }
// now shutdown
executor.shutdown();
assertTrue(executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS));
updatingThread.join(); updatingThread.join();
listenersThread.join(); listenersThread.join();
} }