Add test coverage for global checkpoint listeners

This commit adds test coverage for two cases not previously covered by
the existing testing. Namely, we add coverage ensuring that the executor
is used to notify listeners being added that are immediately notified
because the shard is closed or because the global checkpoint is already
beyond what the listener knows.
This commit is contained in:
Jason Tedor 2018-09-11 23:17:26 -04:00
parent 743327efc2
commit 9752540866
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
2 changed files with 53 additions and 3 deletions

View File

@ -97,7 +97,6 @@ public class GlobalCheckpointListeners implements Closeable {
if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) {
// notify directly
executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null));
return;
} else {
if (listeners == null) {
listeners = new ArrayList<>();

View File

@ -336,14 +336,65 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
};
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE);
final AtomicInteger notified = new AtomicInteger();
final int numberOfListeners = randomIntBetween(0, 16);
for (int i = 0; i < numberOfListeners; i++) {
globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> {});
globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> {
notified.incrementAndGet();
assertThat(g, equalTo(globalCheckpoint));
assertNull(e);
});
}
globalCheckpointListeners.globalCheckpointUpdated(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE));
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
assertThat(notified.get(), equalTo(numberOfListeners));
assertThat(count.get(), equalTo(numberOfListeners == 0 ? 0 : 1));
}
public void testNotificationOnClosedUsesExecutor() throws IOException {
final AtomicInteger count = new AtomicInteger();
final Executor executor = command -> {
count.incrementAndGet();
command.run();
};
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger);
globalCheckpointListeners.close();
final AtomicInteger notified = new AtomicInteger();
final int numberOfListeners = randomIntBetween(0, 16);
for (int i = 0; i < numberOfListeners; i++) {
globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> {
notified.incrementAndGet();
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertNotNull(e);
assertThat(e.getShardId(), equalTo(shardId));
});
}
assertThat(notified.get(), equalTo(numberOfListeners));
assertThat(count.get(), equalTo(numberOfListeners));
}
public void testListenersReadyToBeNotifiedUsesExecutor() {
final AtomicInteger count = new AtomicInteger();
final Executor executor = command -> {
count.incrementAndGet();
command.run();
};
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger);
final long globalCheckpoint = randomNonNegativeLong();
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
final AtomicInteger notified = new AtomicInteger();
final int numberOfListeners = randomIntBetween(0, 16);
for (int i = 0; i < numberOfListeners; i++) {
globalCheckpointListeners.add(randomLongBetween(0, globalCheckpoint), (g, e) -> {
notified.incrementAndGet();
assertThat(g, equalTo(globalCheckpoint));
assertNull(e);
});
}
assertThat(notified.get(), equalTo(numberOfListeners));
assertThat(count.get(), equalTo(numberOfListeners));
}
public void testConcurrency() throws BrokenBarrierException, InterruptedException {
final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, 8));
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger);