diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index 224d5be17e1..bedd1654449 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -21,11 +21,13 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.FutureUtils; import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; @@ -34,6 +36,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -63,7 +66,7 @@ public class GlobalCheckpointListeners implements Closeable { // guarded by this private boolean closed; - private Map> listeners; + private final Map>> listeners = new LinkedHashMap<>(); private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO; private final ShardId shardId; @@ -91,62 +94,56 @@ public class GlobalCheckpointListeners implements Closeable { } /** - * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the - * listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the - * shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global - * checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard - * is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be - * notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for - * the timeout means no timeout will be associated to the listener. + * Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for, + * then the listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. + * If the shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global + * checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated above the + * global checkpoint the listener is waiting for, or the shard is closed. A listener must re-register after one of these events to + * receive subsequent events. Callers may add a timeout to be notified after if the timeout elapses. In this case, the listener will be + * notified with a {@link TimeoutException}. Passing null fo the timeout means no timeout will be associated to the listener. * - * @param currentGlobalCheckpoint the current global checkpoint known to the listener - * @param listener the listener - * @param timeout the listener timeout, or null if no timeout + * @param waitingForGlobalCheckpoint the current global checkpoint known to the listener + * @param listener the listener + * @param timeout the listener timeout, or null if no timeout */ - synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) { + synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) { if (closed) { executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId))); return; } - if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) { + if (lastKnownGlobalCheckpoint >= waitingForGlobalCheckpoint) { // notify directly executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null)); } else { - if (listeners == null) { - listeners = new LinkedHashMap<>(); - } if (timeout == null) { - listeners.put(listener, null); + listeners.put(listener, Tuple.tuple(waitingForGlobalCheckpoint, null)); } else { listeners.put( listener, - scheduler.schedule( - () -> { - final boolean removed; - synchronized (this) { - /* - * Note that the listeners map can be null if a notification nulled out the map reference when - * notifying listeners, and then our scheduled execution occurred before we could be cancelled by - * the notification. In this case, we would have blocked waiting for access to this critical - * section. - * - * What is more, we know that this listener has a timeout associated with it (otherwise we would - * not be here) so the return value from remove being null is an indication that we are not in the - * map. This can happen if a notification nulled out the listeners, and then our scheduled execution - * occurred before we could be cancelled by the notification, and then another thread added a - * listener causing the listeners map reference to be non-null again. In this case, our listener - * here would not be in the map and we should not fire the timeout logic. - */ - removed = listeners != null && listeners.remove(listener) != null; - } - if (removed) { - final TimeoutException e = new TimeoutException(timeout.getStringRep()); - logger.trace("global checkpoint listener timed out", e); - executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e)); - } - }, - timeout.nanos(), - TimeUnit.NANOSECONDS)); + Tuple.tuple( + waitingForGlobalCheckpoint, + scheduler.schedule( + () -> { + final boolean removed; + synchronized (this) { + /* + * We know that this listener has a timeout associated with it (otherwise we would not be + * here) so the future component of the return value from remove being null is an indication + * that we are not in the map. This can happen if a notification collected us into listeners + * to be notified and removed us from the map, and then our scheduled execution occurred + * before we could be cancelled by the notification. In this case, our listener here would + * not be in the map and we should not fire the timeout logic. + */ + removed = listeners.remove(listener).v2() != null; + } + if (removed) { + final TimeoutException e = new TimeoutException(timeout.getStringRep()); + logger.trace("global checkpoint listener timed out", e); + executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e)); + } + }, + timeout.nanos(), + TimeUnit.NANOSECONDS))); } } } @@ -163,7 +160,7 @@ public class GlobalCheckpointListeners implements Closeable { * @return the number of listeners pending notification */ synchronized int pendingListeners() { - return listeners == null ? 0 : listeners.size(); + return listeners.size(); } /** @@ -173,7 +170,7 @@ public class GlobalCheckpointListeners implements Closeable { * @return a scheduled future representing the timeout future for the listener, otherwise null */ synchronized ScheduledFuture getTimeoutFuture(final GlobalCheckpointListener listener) { - return listeners.get(listener); + return listeners.get(listener).v2(); } /** @@ -193,22 +190,31 @@ public class GlobalCheckpointListeners implements Closeable { private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) { assert Thread.holdsLock(this); assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null); - if (listeners != null) { - // capture the current listeners - final Map> currentListeners = listeners; - listeners = null; - if (currentListeners != null) { - executor.execute(() -> { - for (final Map.Entry> listener : currentListeners.entrySet()) { - /* - * We do not want to interrupt any timeouts that fired, these will detect that the listener has been notified and - * not trigger the timeout. - */ - FutureUtils.cancel(listener.getValue()); - notifyListener(listener.getKey(), globalCheckpoint, e); - } - }); - } + + final Map>> listenersToNotify; + if (globalCheckpoint != UNASSIGNED_SEQ_NO) { + listenersToNotify = + listeners + .entrySet() + .stream() + .filter(entry -> entry.getValue().v1() <= globalCheckpoint) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + listenersToNotify.keySet().forEach(listeners::remove); + } else { + listenersToNotify = new HashMap<>(listeners); + listeners.clear(); + } + if (listenersToNotify.isEmpty() == false) { + executor.execute(() -> + listenersToNotify + .forEach((listener, t) -> { + /* + * We do not want to interrupt any timeouts that fired, these will detect that the listener has been + * notified and not trigger the timeout. + */ + FutureUtils.cancel(t.v2()); + notifyListener(listener, globalCheckpoint, e); + })); } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 91d87b00082..168444a2267 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1781,19 +1781,20 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } /** - * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the - * listener will fire immediately on the calling thread. If the specified timeout elapses before the listener is notified, the listener - * will be notified with an {@link TimeoutException}. A caller may pass null to specify no timeout. + * Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for, + * then the listener will be notified immediately via an executor (so possibly not on the current thread). If the specified timeout + * elapses before the listener is notified, the listener will be notified with an {@link TimeoutException}. A caller may pass null to + * specify no timeout. * - * @param currentGlobalCheckpoint the current global checkpoint known to the listener - * @param listener the listener - * @param timeout the timeout + * @param waitingForGlobalCheckpoint the global checkpoint the listener is waiting for + * @param listener the listener + * @param timeout the timeout */ public void addGlobalCheckpointListener( - final long currentGlobalCheckpoint, + final long waitingForGlobalCheckpoint, final GlobalCheckpointListeners.GlobalCheckpointListener listener, final TimeValue timeout) { - this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener, timeout); + this.globalCheckpointListeners.add(waitingForGlobalCheckpoint, listener, timeout); } /** diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index 4ab278cc02a..fa0e0cee143 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Assertions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -31,7 +32,9 @@ import org.mockito.ArgumentCaptor; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -76,62 +79,70 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); - final int numberOfListeners = randomIntBetween(0, 16); - final long[] globalCheckpoints = new long[numberOfListeners]; + final int numberOfListeners = randomIntBetween(0, 64); + final Map listeners = new HashMap<>(); + final Map notifiedListeners = new HashMap<>(); for (int i = 0; i < numberOfListeners; i++) { - final int index = i; - final AtomicBoolean invoked = new AtomicBoolean(); - final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (g, e) -> { - if (invoked.compareAndSet(false, true) == false) { - throw new IllegalStateException("listener invoked twice"); - } - assert g != UNASSIGNED_SEQ_NO; - assert e == null; - globalCheckpoints[index] = g; - }; - globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); + final GlobalCheckpointListeners.GlobalCheckpointListener listener = new GlobalCheckpointListeners.GlobalCheckpointListener() { + @Override + public void accept(final long g, final Exception e) { + notifiedListeners.put(this, g); + } + }; + final long waitingGlobalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); + listeners.put(listener, waitingGlobalCheckpoint); + globalCheckpointListeners.add(waitingGlobalCheckpoint, maybeMultipleInvocationProtectingListener(listener), null); } - final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE - 1); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); - for (int i = 0; i < numberOfListeners; i++) { - assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + for (final Map.Entry listener : listeners.entrySet()) { + if (listener.getValue() <= globalCheckpoint) { + // only listeners waiting on a lower global checkpoint will have been notified + assertThat(notifiedListeners.get(listener.getKey()), equalTo(globalCheckpoint)); + } else { + assertNull(notifiedListeners.get(listener.getKey())); + } } // test the listeners are not invoked twice - final long nextGlobalCheckpoint = randomLongBetween(globalCheckpoint + 1, Long.MAX_VALUE); + notifiedListeners.clear(); + final long nextGlobalCheckpoint = randomLongBetween(1 + globalCheckpoint, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(nextGlobalCheckpoint); - for (int i = 0; i < numberOfListeners; i++) { - assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + for (final Map.Entry listener : listeners.entrySet()) { + if (listener.getValue() > globalCheckpoint && listener.getValue() <= nextGlobalCheckpoint) { + // these listeners will have been notified by the second global checkpoint update, and all the other listeners should not be + assertThat(notifiedListeners.get(listener.getKey()), equalTo(nextGlobalCheckpoint)); + } else { + assertNull(notifiedListeners.get(listener.getKey())); + } } // closing should also not notify the listeners + notifiedListeners.clear(); globalCheckpointListeners.close(); - for (int i = 0; i < numberOfListeners; i++) { - assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + for (final Map.Entry listener : listeners.entrySet()) { + if (listener.getValue() > nextGlobalCheckpoint) { + // these listeners should have been notified that we closed, and all the other listeners should not be + assertThat(notifiedListeners.get(listener.getKey()), equalTo(UNASSIGNED_SEQ_NO)); + } else { + assertNull(notifiedListeners.get(listener.getKey())); + } } } public void testListenersReadyToBeNotified() throws IOException { final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); - final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); + final long globalCheckpoint = randomLongBetween(0, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); final int numberOfListeners = randomIntBetween(0, 16); final long[] globalCheckpoints = new long[numberOfListeners]; for (int i = 0; i < numberOfListeners; i++) { final int index = i; - final AtomicBoolean invoked = new AtomicBoolean(); - final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (g, e) -> { - if (invoked.compareAndSet(false, true) == false) { - throw new IllegalStateException("listener invoked twice"); - } - assert g != UNASSIGNED_SEQ_NO; - assert e == null; - globalCheckpoints[index] = g; - }; - globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener, null); + globalCheckpointListeners.add( + randomLongBetween(0, globalCheckpoint), + maybeMultipleInvocationProtectingListener((g, e) -> globalCheckpoints[index] = g), + null); // the listener should be notified immediately assertThat(globalCheckpoints[index], equalTo(globalCheckpoint)); } @@ -161,18 +172,17 @@ public class GlobalCheckpointListenersTests extends ESTestCase { for (int i = 0; i < numberOfListeners; i++) { final int index = i; final boolean failure = randomBoolean(); - final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (g, e) -> { - assert globalCheckpoint != UNASSIGNED_SEQ_NO; - assert e == null; + globalCheckpointListeners.add( + randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), + maybeMultipleInvocationProtectingListener((g, e) -> { if (failure) { globalCheckpoints[index] = Long.MIN_VALUE; throw new RuntimeException("failure"); } else { globalCheckpoints[index] = globalCheckpoint; } - }; - globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener, null); + }), + null); // the listener should be notified immediately if (failure) { assertThat(globalCheckpoints[i], equalTo(Long.MIN_VALUE)); @@ -202,17 +212,8 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final Exception[] exceptions = new Exception[numberOfListeners]; for (int i = 0; i < numberOfListeners; i++) { final int index = i; - final AtomicBoolean invoked = new AtomicBoolean(); - final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (globalCheckpoint, e) -> { - if (invoked.compareAndSet(false, true) == false) { - throw new IllegalStateException("listener invoked twice"); - } - assert globalCheckpoint == UNASSIGNED_SEQ_NO; - assert e != null; - exceptions[index] = e; - }; - globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); + globalCheckpointListeners.add( + 0, maybeMultipleInvocationProtectingListener((g, e) -> exceptions[index] = e), null); } globalCheckpointListeners.close(); for (int i = 0; i < numberOfListeners; i++) { @@ -238,16 +239,13 @@ public class GlobalCheckpointListenersTests extends ESTestCase { globalCheckpointListeners.close(); final AtomicBoolean invoked = new AtomicBoolean(); final CountDownLatch latch = new CountDownLatch(1); - final GlobalCheckpointListeners.GlobalCheckpointListener listener = (g, e) -> { - assert g == UNASSIGNED_SEQ_NO; - assert e != null; - if (invoked.compareAndSet(false, true) == false) { - latch.countDown(); - throw new IllegalStateException("listener invoked twice"); - } - latch.countDown(); - }; - globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE), listener, null); + globalCheckpointListeners.add( + randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE), + maybeMultipleInvocationProtectingListener((g, e) -> { + invoked.set(true); + latch.countDown(); + }), + null); latch.await(); assertTrue(invoked.get()); } @@ -264,18 +262,17 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final int index = i; final boolean failure = randomBoolean(); failures[index] = failure; - final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (g, e) -> { - assert g != UNASSIGNED_SEQ_NO; - assert e == null; + globalCheckpointListeners.add( + 0, + maybeMultipleInvocationProtectingListener((g, e) -> { if (failure) { globalCheckpoints[index] = Long.MIN_VALUE; throw new RuntimeException("failure"); } else { globalCheckpoints[index] = g; } - }; - globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); + }), + null); } final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); @@ -319,17 +316,16 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final int index = i; final boolean failure = randomBoolean(); failures[index] = failure; - final GlobalCheckpointListeners.GlobalCheckpointListener listener = - (g, e) -> { - assert g == UNASSIGNED_SEQ_NO; - assert e != null; + globalCheckpointListeners.add( + 0, + maybeMultipleInvocationProtectingListener((g, e) -> { if (failure) { throw new RuntimeException("failure"); } else { exceptions[index] = e; } - }; - globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); + }), + null); } globalCheckpointListeners.close(); for (int i = 0; i < numberOfListeners; i++) { @@ -370,12 +366,12 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final int numberOfListeners = randomIntBetween(0, 16); for (int i = 0; i < numberOfListeners; i++) { globalCheckpointListeners.add( - NO_OPS_PERFORMED, - (g, e) -> { + 0, + maybeMultipleInvocationProtectingListener((g, e) -> { notified.incrementAndGet(); assertThat(g, equalTo(globalCheckpoint)); assertNull(e); - }, + }), null); } globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); @@ -396,13 +392,13 @@ public class GlobalCheckpointListenersTests extends ESTestCase { for (int i = 0; i < numberOfListeners; i++) { globalCheckpointListeners.add( NO_OPS_PERFORMED, - (g, e) -> { + maybeMultipleInvocationProtectingListener((g, e) -> { notified.incrementAndGet(); assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); assertNotNull(e); assertThat(e, instanceOf(IndexShardClosedException.class)); assertThat(((IndexShardClosedException) e).getShardId(), equalTo(shardId)); - }, + }), null); } assertThat(notified.get(), equalTo(numberOfListeners)); @@ -423,11 +419,12 @@ public class GlobalCheckpointListenersTests extends ESTestCase { for (int i = 0; i < numberOfListeners; i++) { globalCheckpointListeners.add( randomLongBetween(0, globalCheckpoint), - (g, e) -> { + maybeMultipleInvocationProtectingListener((g, e) -> { notified.incrementAndGet(); assertThat(g, equalTo(globalCheckpoint)); assertNull(e); - }, null); + }), + null); } assertThat(notified.get(), equalTo(numberOfListeners)); assertThat(count.get(), equalTo(numberOfListeners)); @@ -472,11 +469,11 @@ public class GlobalCheckpointListenersTests extends ESTestCase { // sometimes this will notify the listener immediately globalCheckpointListeners.add( globalCheckpoint.get(), - (g, e) -> { + maybeMultipleInvocationProtectingListener((g, e) -> { if (invocation.compareAndSet(false, true) == false) { throw new IllegalStateException("listener invoked twice"); } - }, + }), randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1)))); } // synchronize ending with the updating thread and the main test thread @@ -511,7 +508,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final CountDownLatch latch = new CountDownLatch(1); globalCheckpointListeners.add( NO_OPS_PERFORMED, - (g, e) -> { + maybeMultipleInvocationProtectingListener((g, e) -> { try { notified.set(true); assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); @@ -527,7 +524,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { } finally { latch.countDown(); } - }, + }), timeout); latch.await(); @@ -546,7 +543,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final CountDownLatch latch = new CountDownLatch(1); globalCheckpointListeners.add( NO_OPS_PERFORMED, - (g, e) -> { + maybeMultipleInvocationProtectingListener((g, e) -> { try { notified.set(true); assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); @@ -554,7 +551,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { } finally { latch.countDown(); } - }, + }), timeout); latch.await(); // ensure the listener notification occurred on the executor @@ -574,9 +571,9 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); globalCheckpointListeners.add( NO_OPS_PERFORMED, - (g, e) -> { + maybeMultipleInvocationProtectingListener((g, e) -> { throw new RuntimeException("failure"); - }, + }), timeout); latch.await(); final ArgumentCaptor message = ArgumentCaptor.forClass(String.class); @@ -592,10 +589,11 @@ public class GlobalCheckpointListenersTests extends ESTestCase { final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); final TimeValue timeout = TimeValue.timeValueNanos(Long.MAX_VALUE); - final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener = (g, e) -> { - assertThat(g, equalTo(NO_OPS_PERFORMED)); - assertNull(e); - }; + final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener = + maybeMultipleInvocationProtectingListener((g, e) -> { + assertThat(g, equalTo(NO_OPS_PERFORMED)); + assertNull(e); + }); globalCheckpointListeners.add(NO_OPS_PERFORMED, globalCheckpointListener, timeout); final ScheduledFuture future = globalCheckpointListeners.getTimeoutFuture(globalCheckpointListener); assertNotNull(future); @@ -603,6 +601,21 @@ public class GlobalCheckpointListenersTests extends ESTestCase { assertTrue(future.isCancelled()); } + private GlobalCheckpointListeners.GlobalCheckpointListener maybeMultipleInvocationProtectingListener( + final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener) { + if (Assertions.ENABLED) { + final AtomicBoolean invoked = new AtomicBoolean(); + return (g, e) -> { + if (invoked.compareAndSet(false, true) == false) { + throw new AssertionError("listener invoked twice"); + } + globalCheckpointListener.accept(g, e); + }; + } else { + return globalCheckpointListener; + } + } + private void awaitQuietly(final CyclicBarrier barrier) { try { barrier.await(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 2c659ac60ec..56a14da845f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -747,7 +747,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { final int index = i; final AtomicLong globalCheckpoint = new AtomicLong(); shard.addGlobalCheckpointListener( - i - 1, + i, (g, e) -> { assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED)); assertNull(e); @@ -759,7 +759,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { // adding a listener expecting a lower global checkpoint should fire immediately final AtomicLong immediateGlobalCheckpint = new AtomicLong(); shard.addGlobalCheckpointListener( - randomLongBetween(NO_OPS_PERFORMED, i - 1), + randomLongBetween(0, i), (g, e) -> { assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED)); assertNull(e); @@ -770,7 +770,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { } final AtomicBoolean invoked = new AtomicBoolean(); shard.addGlobalCheckpointListener( - numberOfUpdates - 1, + numberOfUpdates, (g, e) -> { invoked.set(true); assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); @@ -792,7 +792,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { final CountDownLatch latch = new CountDownLatch(1); final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); shard.addGlobalCheckpointListener( - NO_OPS_PERFORMED, + 0, (g, e) -> { try { notified.set(true);