From 39191331d19e00312f0de2db850fd72c92dafdee Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 14 Sep 2018 09:32:03 -0400 Subject: [PATCH] Only notify ready global checkpoint listeners (#33690) When we add a global checkpoint listener, it is also carries along with it a value that it thinks is the current global checkpoint. This value can be above the actual global checkpoint on a shard if the listener knows the global checkpoint from another shard copy (e.g., the primary), and the current shard copy is lagging behind. Today we notify the listener whenever the global checkpoint advances, regardless if it goes above the current global checkpoint known to the listener. This commit reworks this implementation. Rather than thinking of the value associated with the listener as the current global checkpoint known to the listener, we think of it as the value that the listener is waiting for the global checkpoint to advance to (inclusive). Now instead of notifying all waiting listeners when the global checkpoint advances, we only notify those that are waiting for a value not larger than the actual global checkpoint that we advanced to. --- .../shard/GlobalCheckpointListeners.java | 130 ++++++------ .../elasticsearch/index/shard/IndexShard.java | 17 +- .../shard/GlobalCheckpointListenersTests.java | 197 ++++++++++-------- .../index/shard/IndexShardIT.java | 8 +- 4 files changed, 186 insertions(+), 166 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index 224d5be17e1..bedd1654449 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -21,11 +21,13 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.FutureUtils; import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; @@ -34,6 +36,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -63,7 +66,7 @@ public class GlobalCheckpointListeners implements Closeable { // guarded by this private boolean closed; - private Map> listeners; + private final Map>> listeners = new LinkedHashMap<>(); private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO; private final ShardId shardId; @@ -91,62 +94,56 @@ public class GlobalCheckpointListeners implements Closeable { } /** - * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the - * listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the - * shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global - * checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard - * is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be - * notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for - * the timeout means no timeout will be associated to the listener. + * Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for, + * then the listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. + * If the shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global + * checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated above the + * global checkpoint the listener is waiting for, or the shard is closed. A listener must re-register after one of these events to + * receive subsequent events. Callers may add a timeout to be notified after if the timeout elapses. In this case, the listener will be + * notified with a {@link TimeoutException}. Passing null fo the timeout means no timeout will be associated to the listener. * - * @param currentGlobalCheckpoint the current global checkpoint known to the listener - * @param listener the listener - * @param timeout the listener timeout, or null if no timeout + * @param waitingForGlobalCheckpoint the current global checkpoint known to the listener + * @param listener the listener + * @param timeout the listener timeout, or null if no timeout */ - synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) { + synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) { if (closed) { executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId))); return; } - if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) { + if (lastKnownGlobalCheckpoint >= waitingForGlobalCheckpoint) { // notify directly executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null)); } else { - if (listeners == null) { - listeners = new LinkedHashMap<>(); - } if (timeout == null) { - listeners.put(listener, null); + listeners.put(listener, Tuple.tuple(waitingForGlobalCheckpoint, null)); } else { listeners.put( listener, - scheduler.schedule( - () -> { - final boolean removed; - synchronized (this) { - /* - * Note that the listeners map can be null if a notification nulled out the map reference when - * notifying listeners, and then our scheduled execution occurred before we could be cancelled by - * the notification. In this case, we would have blocked waiting for access to this critical - * section. - * - * What is more, we know that this listener has a timeout associated with it (otherwise we would - * not be here) so the return value from remove being null is an indication that we are not in the - * map. This can happen if a notification nulled out the listeners, and then our scheduled execution - * occurred before we could be cancelled by the notification, and then another thread added a - * listener causing the listeners map reference to be non-null again. In this case, our listener - * here would not be in the map and we should not fire the timeout logic. - */ - removed = listeners != null && listeners.remove(listener) != null; - } - if (removed) { - final TimeoutException e = new TimeoutException(timeout.getStringRep()); - logger.trace("global checkpoint listener timed out", e); - executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e)); - } - }, - timeout.nanos(), - TimeUnit.NANOSECONDS)); + Tuple.tuple( + waitingForGlobalCheckpoint, + scheduler.schedule( + () -> { + final boolean removed; + synchronized (this) { + /* + * We know that this listener has a timeout associated with it (otherwise we would not be + * here) so the future component of the return value from remove being null is an indication + * that we are not in the map. This can happen if a notification collected us into listeners + * to be notified and removed us from the map, and then our scheduled execution occurred + * before we could be cancelled by the notification. In this case, our listener here would + * not be in the map and we should not fire the timeout logic. + */ + removed = listeners.remove(listener).v2() != null; + } + if (removed) { + final TimeoutException e = new TimeoutException(timeout.getStringRep()); + logger.trace("global checkpoint listener timed out", e); + executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e)); + } + }, + timeout.nanos(), + TimeUnit.NANOSECONDS))); } } } @@ -163,7 +160,7 @@ public class GlobalCheckpointListeners implements Closeable { * @return the number of listeners pending notification */ synchronized int pendingListeners() { - return listeners == null ? 0 : listeners.size(); + return listeners.size(); } /** @@ -173,7 +170,7 @@ public class GlobalCheckpointListeners implements Closeable { * @return a scheduled future representing the timeout future for the listener, otherwise null */ synchronized ScheduledFuture getTimeoutFuture(final GlobalCheckpointListener listener) { - return listeners.get(listener); + return listeners.get(listener).v2(); } /** @@ -193,22 +190,31 @@ public class GlobalCheckpointListeners implements Closeable { private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) { assert Thread.holdsLock(this); assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null); - if (listeners != null) { - // capture the current listeners - final Map> currentListeners = listeners; - listeners = null; - if (currentListeners != null) { - executor.execute(() -> { - for (final Map.Entry> listener : currentListeners.entrySet()) { - /* - * We do not want to interrupt any timeouts that fired, these will detect that the listener has been notified and - * not trigger the timeout. - */ - FutureUtils.cancel(listener.getValue()); - notifyListener(listener.getKey(), globalCheckpoint, e); - } - }); - } + + final Map>> listenersToNotify; + if (globalCheckpoint != UNASSIGNED_SEQ_NO) { + listenersToNotify = + listeners + .entrySet() + .stream() + .filter(entry -> entry.getValue().v1() <= globalCheckpoint) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + listenersToNotify.keySet().forEach(listeners::remove); + } else { + listenersToNotify = new HashMap<>(listeners); + listeners.clear(); + } + if (listenersToNotify.isEmpty() == false) { + executor.execute(() -> + listenersToNotify + .forEach((listener, t) -> { + /* + * We do not want to interrupt any timeouts that fired, these will detect that the listener has been + * notified and not trigger the timeout. + */ + FutureUtils.cancel(t.v2()); + notifyListener(listener, globalCheckpoint, e); + })); } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 91d87b00082..168444a2267 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1781,19 +1781,20 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } /** - * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the - * listener will fire immediately on the calling thread. If the specified timeout elapses before the listener is notified, the listener - * will be notified with an {@link TimeoutException}. A caller may pass null to specify no timeout. + * Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for, + * then the listener will be notified immediately via an executor (so possibly not on the current thread). If the specified timeout + * elapses before the listener is notified, the listener will be notified with an {@link TimeoutException}. A caller may pass null to + * specify no timeout. * - * @param currentGlobalCheckpoint the current global checkpoint known to the listener - * @param listener the listener - * @param timeout the timeout + * @param waitingForGlobalCheckpoint the global checkpoint the listener is waiting for + * @param listener the listener + * @param timeout the timeout */ public void addGlobalCheckpointListener( - final long currentGlobalCheckpoint, + final long waitingForGlobalCheckpoint, final GlobalCheckpointListeners.GlobalCheckpointListener listener, final TimeValue timeout) { - this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener, timeout); + this.globalCheckpointListeners.add(waitingForGlobalCheckpoint, listener, timeout); } /** diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index 4ab278cc02a..fa0e0cee143 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Assertions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -31,7 +32,9 @@ import org.mockito.ArgumentCaptor; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -76,62 +79,70 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); - final int numberOfListeners = randomIntBetween(0, 16); - final long[] globalCheckpoints = new long[numberOfListeners]; + final int numberOfListeners = randomIntBetween(0, 64); + final Map listeners = new HashMap<>(); + final Map notifiedListeners = new HashMap<>(); for (int i = 0; i < numberOfListeners; i++) { - final int index = i; - final AtomicBoolean invoked = new AtomicBoolean(); - final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (g, e) -> { - if (invoked.compareAndSet(false, true) == false) { - throw new IllegalStateException("listener invoked twice"); - } - assert g != UNASSIGNED_SEQ_NO; - assert e == null; - globalCheckpoints[index] = g; - }; - globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); + final GlobalCheckpointListeners.GlobalCheckpointListener listener = new GlobalCheckpointListeners.GlobalCheckpointListener() { + @Override + public void accept(final long g, final Exception e) { + notifiedListeners.put(this, g); + } + }; + final long waitingGlobalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); + listeners.put(listener, waitingGlobalCheckpoint); + globalCheckpointListeners.add(waitingGlobalCheckpoint, maybeMultipleInvocationProtectingListener(listener), null); } - final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE - 1); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); - for (int i = 0; i < numberOfListeners; i++) { - assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + for (final Map.Entry listener : listeners.entrySet()) { + if (listener.getValue() <= globalCheckpoint) { + // only listeners waiting on a lower global checkpoint will have been notified + assertThat(notifiedListeners.get(listener.getKey()), equalTo(globalCheckpoint)); + } else { + assertNull(notifiedListeners.get(listener.getKey())); + } } // test the listeners are not invoked twice - final long nextGlobalCheckpoint = randomLongBetween(globalCheckpoint + 1, Long.MAX_VALUE); + notifiedListeners.clear(); + final long nextGlobalCheckpoint = randomLongBetween(1 + globalCheckpoint, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(nextGlobalCheckpoint); - for (int i = 0; i < numberOfListeners; i++) { - assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + for (final Map.Entry listener : listeners.entrySet()) { + if (listener.getValue() > globalCheckpoint && listener.getValue() <= nextGlobalCheckpoint) { + // these listeners will have been notified by the second global checkpoint update, and all the other listeners should not be + assertThat(notifiedListeners.get(listener.getKey()), equalTo(nextGlobalCheckpoint)); + } else { + assertNull(notifiedListeners.get(listener.getKey())); + } } // closing should also not notify the listeners + notifiedListeners.clear(); globalCheckpointListeners.close(); - for (int i = 0; i < numberOfListeners; i++) { - assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + for (final Map.Entry listener : listeners.entrySet()) { + if (listener.getValue() > nextGlobalCheckpoint) { + // these listeners should have been notified that we closed, and all the other listeners should not be + assertThat(notifiedListeners.get(listener.getKey()), equalTo(UNASSIGNED_SEQ_NO)); + } else { + assertNull(notifiedListeners.get(listener.getKey())); + } } } public void testListenersReadyToBeNotified() throws IOException { final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); - final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); + final long globalCheckpoint = randomLongBetween(0, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); final int numberOfListeners = randomIntBetween(0, 16); final long[] globalCheckpoints = new long[numberOfListeners]; for (int i = 0; i < numberOfListeners; i++) { final int index = i; - final AtomicBoolean invoked = new AtomicBoolean(); - final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (g, e) -> { - if (invoked.compareAndSet(false, true) == false) { - throw new IllegalStateException("listener invoked twice"); - } - assert g != UNASSIGNED_SEQ_NO; - assert e == null; - globalCheckpoints[index] = g; - }; - globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener, null); + globalCheckpointListeners.add( + randomLongBetween(0, globalCheckpoint), + maybeMultipleInvocationProtectingListener((g, e) -> globalCheckpoints[index] = g), + null); // the listener should be notified immediately assertThat(globalCheckpoints[index], equalTo(globalCheckpoint)); } @@ -161,18 +172,17 @@ public class GlobalCheckpointListenersTests extends ESTestCase { for (int i = 0; i < numberOfListeners; i++) { final int index = i; final boolean failure = randomBoolean(); - final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (g, e) -> { - assert globalCheckpoint != UNASSIGNED_SEQ_NO; - assert e == null; + globalCheckpointListeners.add( + randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), + maybeMultipleInvocationProtectingListener((g, e) -> { if (failure) { globalCheckpoints[index] = Long.MIN_VALUE; throw new RuntimeException("failure"); } else { globalCheckpoints[index] = globalCheckpoint; } - }; - globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener, null); + }), + null); // the listener should be notified immediately if (failure) { assertThat(globalCheckpoints[i], equalTo(Long.MIN_VALUE)); @@ -202,17 +212,8 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final Exception[] exceptions = new Exception[numberOfListeners]; for (int i = 0; i < numberOfListeners; i++) { final int index = i; - final AtomicBoolean invoked = new AtomicBoolean(); - final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (globalCheckpoint, e) -> { - if (invoked.compareAndSet(false, true) == false) { - throw new IllegalStateException("listener invoked twice"); - } - assert globalCheckpoint == UNASSIGNED_SEQ_NO; - assert e != null; - exceptions[index] = e; - }; - globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); + globalCheckpointListeners.add( + 0, maybeMultipleInvocationProtectingListener((g, e) -> exceptions[index] = e), null); } globalCheckpointListeners.close(); for (int i = 0; i < numberOfListeners; i++) { @@ -238,16 +239,13 @@ public class GlobalCheckpointListenersTests extends ESTestCase { globalCheckpointListeners.close(); final AtomicBoolean invoked = new AtomicBoolean(); final CountDownLatch latch = new CountDownLatch(1); - final GlobalCheckpointListeners.GlobalCheckpointListener listener = (g, e) -> { - assert g == UNASSIGNED_SEQ_NO; - assert e != null; - if (invoked.compareAndSet(false, true) == false) { - latch.countDown(); - throw new IllegalStateException("listener invoked twice"); - } - latch.countDown(); - }; - globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE), listener, null); + globalCheckpointListeners.add( + randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE), + maybeMultipleInvocationProtectingListener((g, e) -> { + invoked.set(true); + latch.countDown(); + }), + null); latch.await(); assertTrue(invoked.get()); } @@ -264,18 +262,17 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final int index = i; final boolean failure = randomBoolean(); failures[index] = failure; - final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (g, e) -> { - assert g != UNASSIGNED_SEQ_NO; - assert e == null; + globalCheckpointListeners.add( + 0, + maybeMultipleInvocationProtectingListener((g, e) -> { if (failure) { globalCheckpoints[index] = Long.MIN_VALUE; throw new RuntimeException("failure"); } else { globalCheckpoints[index] = g; } - }; - globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); + }), + null); } final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); @@ -319,17 +316,16 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final int index = i; final boolean failure = randomBoolean(); failures[index] = failure; - final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (g, e) -> { - assert g == UNASSIGNED_SEQ_NO; - assert e != null; + globalCheckpointListeners.add( + 0, + maybeMultipleInvocationProtectingListener((g, e) -> { if (failure) { throw new RuntimeException("failure"); } else { exceptions[index] = e; } - }; - globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); + }), + null); } globalCheckpointListeners.close(); for (int i = 0; i < numberOfListeners; i++) { @@ -370,12 +366,12 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final int numberOfListeners = randomIntBetween(0, 16); for (int i = 0; i < numberOfListeners; i++) { globalCheckpointListeners.add( - NO_OPS_PERFORMED, - (g, e) -> { + 0, + maybeMultipleInvocationProtectingListener((g, e) -> { notified.incrementAndGet(); assertThat(g, equalTo(globalCheckpoint)); assertNull(e); - }, + }), null); } globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); @@ -396,13 +392,13 @@ public class GlobalCheckpointListenersTests extends ESTestCase { for (int i = 0; i < numberOfListeners; i++) { globalCheckpointListeners.add( NO_OPS_PERFORMED, - (g, e) -> { + maybeMultipleInvocationProtectingListener((g, e) -> { notified.incrementAndGet(); assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); assertNotNull(e); assertThat(e, instanceOf(IndexShardClosedException.class)); assertThat(((IndexShardClosedException) e).getShardId(), equalTo(shardId)); - }, + }), null); } assertThat(notified.get(), equalTo(numberOfListeners)); @@ -423,11 +419,12 @@ public class GlobalCheckpointListenersTests extends ESTestCase { for (int i = 0; i < numberOfListeners; i++) { globalCheckpointListeners.add( randomLongBetween(0, globalCheckpoint), - (g, e) -> { + maybeMultipleInvocationProtectingListener((g, e) -> { notified.incrementAndGet(); assertThat(g, equalTo(globalCheckpoint)); assertNull(e); - }, null); + }), + null); } assertThat(notified.get(), equalTo(numberOfListeners)); assertThat(count.get(), equalTo(numberOfListeners)); @@ -472,11 +469,11 @@ public class GlobalCheckpointListenersTests extends ESTestCase { // sometimes this will notify the listener immediately globalCheckpointListeners.add( globalCheckpoint.get(), - (g, e) -> { + maybeMultipleInvocationProtectingListener((g, e) -> { if (invocation.compareAndSet(false, true) == false) { throw new IllegalStateException("listener invoked twice"); } - }, + }), randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1)))); } // synchronize ending with the updating thread and the main test thread @@ -511,7 +508,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final CountDownLatch latch = new CountDownLatch(1); globalCheckpointListeners.add( NO_OPS_PERFORMED, - (g, e) -> { + maybeMultipleInvocationProtectingListener((g, e) -> { try { notified.set(true); assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); @@ -527,7 +524,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { } finally { latch.countDown(); } - }, + }), timeout); latch.await(); @@ -546,7 +543,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final CountDownLatch latch = new CountDownLatch(1); globalCheckpointListeners.add( NO_OPS_PERFORMED, - (g, e) -> { + maybeMultipleInvocationProtectingListener((g, e) -> { try { notified.set(true); assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); @@ -554,7 +551,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { } finally { latch.countDown(); } - }, + }), timeout); latch.await(); // ensure the listener notification occurred on the executor @@ -574,9 +571,9 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); globalCheckpointListeners.add( NO_OPS_PERFORMED, - (g, e) -> { + maybeMultipleInvocationProtectingListener((g, e) -> { throw new RuntimeException("failure"); - }, + }), timeout); latch.await(); final ArgumentCaptor message = ArgumentCaptor.forClass(String.class); @@ -592,10 +589,11 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); final TimeValue timeout = TimeValue.timeValueNanos(Long.MAX_VALUE); - final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener = (g, e) -> { - assertThat(g, equalTo(NO_OPS_PERFORMED)); - assertNull(e); - }; + final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener = + maybeMultipleInvocationProtectingListener((g, e) -> { + assertThat(g, equalTo(NO_OPS_PERFORMED)); + assertNull(e); + }); globalCheckpointListeners.add(NO_OPS_PERFORMED, globalCheckpointListener, timeout); final ScheduledFuture future = globalCheckpointListeners.getTimeoutFuture(globalCheckpointListener); assertNotNull(future); @@ -603,6 +601,21 @@ public class GlobalCheckpointListenersTests extends ESTestCase { assertTrue(future.isCancelled()); } + private GlobalCheckpointListeners.GlobalCheckpointListener maybeMultipleInvocationProtectingListener( + final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener) { + if (Assertions.ENABLED) { + final AtomicBoolean invoked = new AtomicBoolean(); + return (g, e) -> { + if (invoked.compareAndSet(false, true) == false) { + throw new AssertionError("listener invoked twice"); + } + globalCheckpointListener.accept(g, e); + }; + } else { + return globalCheckpointListener; + } + } + private void awaitQuietly(final CyclicBarrier barrier) { try { barrier.await(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 2c659ac60ec..56a14da845f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -747,7 +747,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { final int index = i; final AtomicLong globalCheckpoint = new AtomicLong(); shard.addGlobalCheckpointListener( - i - 1, + i, (g, e) -> { assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED)); assertNull(e); @@ -759,7 +759,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { // adding a listener expecting a lower global checkpoint should fire immediately final AtomicLong immediateGlobalCheckpint = new AtomicLong(); shard.addGlobalCheckpointListener( - randomLongBetween(NO_OPS_PERFORMED, i - 1), + randomLongBetween(0, i), (g, e) -> { assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED)); assertNull(e); @@ -770,7 +770,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { } final AtomicBoolean invoked = new AtomicBoolean(); shard.addGlobalCheckpointListener( - numberOfUpdates - 1, + numberOfUpdates, (g, e) -> { invoked.set(true); assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); @@ -792,7 +792,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { final CountDownLatch latch = new CountDownLatch(1); final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); shard.addGlobalCheckpointListener( - NO_OPS_PERFORMED, + 0, (g, e) -> { try { notified.set(true);