Only notify ready global checkpoint listeners (#33690)

When we add a global checkpoint listener, it is also carries along with
it a value that it thinks is the current global checkpoint. This value
can be above the actual global checkpoint on a shard if the listener
knows the global checkpoint from another shard copy (e.g., the primary),
and the current shard copy is lagging behind. Today we notify the
listener whenever the global checkpoint advances, regardless if it goes
above the current global checkpoint known to the listener. This commit
reworks this implementation. Rather than thinking of the value
associated with the listener as the current global checkpoint known to
the listener, we think of it as the value that the listener is waiting
for the global checkpoint to advance to (inclusive). Now instead of
notifying all waiting listeners when the global checkpoint advances, we
only notify those that are waiting for a value not larger than the
actual global checkpoint that we advanced to.
This commit is contained in:
Jason Tedor 2018-09-14 09:32:03 -04:00 committed by GitHub
parent 4f68104865
commit 39191331d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 186 additions and 166 deletions

View File

@ -21,11 +21,13 @@ package org.elasticsearch.index.shard;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -34,6 +36,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
@ -63,7 +66,7 @@ public class GlobalCheckpointListeners implements Closeable {
// guarded by this // guarded by this
private boolean closed; private boolean closed;
private Map<GlobalCheckpointListener, ScheduledFuture<?>> listeners; private final Map<GlobalCheckpointListener, Tuple<Long, ScheduledFuture<?>>> listeners = new LinkedHashMap<>();
private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO; private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO;
private final ShardId shardId; private final ShardId shardId;
@ -91,53 +94,47 @@ public class GlobalCheckpointListeners implements Closeable {
} }
/** /**
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the * Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for,
* listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the * then the listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners.
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global * If the shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
* checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard * checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated above the
* is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be * global checkpoint the listener is waiting for, or the shard is closed. A listener must re-register after one of these events to
* notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for * receive subsequent events. Callers may add a timeout to be notified after if the timeout elapses. In this case, the listener will be
* the timeout means no timeout will be associated to the listener. * notified with a {@link TimeoutException}. Passing null fo the timeout means no timeout will be associated to the listener.
* *
* @param currentGlobalCheckpoint the current global checkpoint known to the listener * @param waitingForGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener * @param listener the listener
* @param timeout the listener timeout, or null if no timeout * @param timeout the listener timeout, or null if no timeout
*/ */
synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) { synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) {
if (closed) { if (closed) {
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId))); executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)));
return; return;
} }
if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) { if (lastKnownGlobalCheckpoint >= waitingForGlobalCheckpoint) {
// notify directly // notify directly
executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null)); executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null));
} else { } else {
if (listeners == null) {
listeners = new LinkedHashMap<>();
}
if (timeout == null) { if (timeout == null) {
listeners.put(listener, null); listeners.put(listener, Tuple.tuple(waitingForGlobalCheckpoint, null));
} else { } else {
listeners.put( listeners.put(
listener, listener,
Tuple.tuple(
waitingForGlobalCheckpoint,
scheduler.schedule( scheduler.schedule(
() -> { () -> {
final boolean removed; final boolean removed;
synchronized (this) { synchronized (this) {
/* /*
* Note that the listeners map can be null if a notification nulled out the map reference when * We know that this listener has a timeout associated with it (otherwise we would not be
* notifying listeners, and then our scheduled execution occurred before we could be cancelled by * here) so the future component of the return value from remove being null is an indication
* the notification. In this case, we would have blocked waiting for access to this critical * that we are not in the map. This can happen if a notification collected us into listeners
* section. * to be notified and removed us from the map, and then our scheduled execution occurred
* * before we could be cancelled by the notification. In this case, our listener here would
* What is more, we know that this listener has a timeout associated with it (otherwise we would * not be in the map and we should not fire the timeout logic.
* not be here) so the return value from remove being null is an indication that we are not in the
* map. This can happen if a notification nulled out the listeners, and then our scheduled execution
* occurred before we could be cancelled by the notification, and then another thread added a
* listener causing the listeners map reference to be non-null again. In this case, our listener
* here would not be in the map and we should not fire the timeout logic.
*/ */
removed = listeners != null && listeners.remove(listener) != null; removed = listeners.remove(listener).v2() != null;
} }
if (removed) { if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep()); final TimeoutException e = new TimeoutException(timeout.getStringRep());
@ -146,7 +143,7 @@ public class GlobalCheckpointListeners implements Closeable {
} }
}, },
timeout.nanos(), timeout.nanos(),
TimeUnit.NANOSECONDS)); TimeUnit.NANOSECONDS)));
} }
} }
} }
@ -163,7 +160,7 @@ public class GlobalCheckpointListeners implements Closeable {
* @return the number of listeners pending notification * @return the number of listeners pending notification
*/ */
synchronized int pendingListeners() { synchronized int pendingListeners() {
return listeners == null ? 0 : listeners.size(); return listeners.size();
} }
/** /**
@ -173,7 +170,7 @@ public class GlobalCheckpointListeners implements Closeable {
* @return a scheduled future representing the timeout future for the listener, otherwise null * @return a scheduled future representing the timeout future for the listener, otherwise null
*/ */
synchronized ScheduledFuture<?> getTimeoutFuture(final GlobalCheckpointListener listener) { synchronized ScheduledFuture<?> getTimeoutFuture(final GlobalCheckpointListener listener) {
return listeners.get(listener); return listeners.get(listener).v2();
} }
/** /**
@ -193,22 +190,31 @@ public class GlobalCheckpointListeners implements Closeable {
private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) { private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null); assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null);
if (listeners != null) {
// capture the current listeners final Map<GlobalCheckpointListener, Tuple<Long, ScheduledFuture<?>>> listenersToNotify;
final Map<GlobalCheckpointListener, ScheduledFuture<?>> currentListeners = listeners; if (globalCheckpoint != UNASSIGNED_SEQ_NO) {
listeners = null; listenersToNotify =
if (currentListeners != null) { listeners
executor.execute(() -> { .entrySet()
for (final Map.Entry<GlobalCheckpointListener, ScheduledFuture<?>> listener : currentListeners.entrySet()) { .stream()
.filter(entry -> entry.getValue().v1() <= globalCheckpoint)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
listenersToNotify.keySet().forEach(listeners::remove);
} else {
listenersToNotify = new HashMap<>(listeners);
listeners.clear();
}
if (listenersToNotify.isEmpty() == false) {
executor.execute(() ->
listenersToNotify
.forEach((listener, t) -> {
/* /*
* We do not want to interrupt any timeouts that fired, these will detect that the listener has been notified and * We do not want to interrupt any timeouts that fired, these will detect that the listener has been
* not trigger the timeout. * notified and not trigger the timeout.
*/ */
FutureUtils.cancel(listener.getValue()); FutureUtils.cancel(t.v2());
notifyListener(listener.getKey(), globalCheckpoint, e); notifyListener(listener, globalCheckpoint, e);
} }));
});
}
} }
} }

View File

@ -1781,19 +1781,20 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} }
/** /**
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the * Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for,
* listener will fire immediately on the calling thread. If the specified timeout elapses before the listener is notified, the listener * then the listener will be notified immediately via an executor (so possibly not on the current thread). If the specified timeout
* will be notified with an {@link TimeoutException}. A caller may pass null to specify no timeout. * elapses before the listener is notified, the listener will be notified with an {@link TimeoutException}. A caller may pass null to
* specify no timeout.
* *
* @param currentGlobalCheckpoint the current global checkpoint known to the listener * @param waitingForGlobalCheckpoint the global checkpoint the listener is waiting for
* @param listener the listener * @param listener the listener
* @param timeout the timeout * @param timeout the timeout
*/ */
public void addGlobalCheckpointListener( public void addGlobalCheckpointListener(
final long currentGlobalCheckpoint, final long waitingForGlobalCheckpoint,
final GlobalCheckpointListeners.GlobalCheckpointListener listener, final GlobalCheckpointListeners.GlobalCheckpointListener listener,
final TimeValue timeout) { final TimeValue timeout) {
this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener, timeout); this.globalCheckpointListeners.add(waitingForGlobalCheckpoint, listener, timeout);
} }
/** /**

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.shard;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Assertions;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -31,7 +32,9 @@ import org.mockito.ArgumentCaptor;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -76,62 +79,70 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
final GlobalCheckpointListeners globalCheckpointListeners = final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
final int numberOfListeners = randomIntBetween(0, 16); final int numberOfListeners = randomIntBetween(0, 64);
final long[] globalCheckpoints = new long[numberOfListeners]; final Map<GlobalCheckpointListeners.GlobalCheckpointListener, Long> listeners = new HashMap<>();
final Map<GlobalCheckpointListeners.GlobalCheckpointListener, Long> notifiedListeners = new HashMap<>();
for (int i = 0; i < numberOfListeners; i++) { for (int i = 0; i < numberOfListeners; i++) {
final int index = i; final GlobalCheckpointListeners.GlobalCheckpointListener listener = new GlobalCheckpointListeners.GlobalCheckpointListener() {
final AtomicBoolean invoked = new AtomicBoolean(); @Override
final GlobalCheckpointListeners.GlobalCheckpointListener listener = public void accept(final long g, final Exception e) {
(g, e) -> { notifiedListeners.put(this, g);
if (invoked.compareAndSet(false, true) == false) {
throw new IllegalStateException("listener invoked twice");
} }
assert g != UNASSIGNED_SEQ_NO;
assert e == null;
globalCheckpoints[index] = g;
}; };
globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); final long waitingGlobalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE);
listeners.put(listener, waitingGlobalCheckpoint);
globalCheckpointListeners.add(waitingGlobalCheckpoint, maybeMultipleInvocationProtectingListener(listener), null);
} }
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE - 1);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
for (int i = 0; i < numberOfListeners; i++) { for (final Map.Entry<GlobalCheckpointListeners.GlobalCheckpointListener, Long> listener : listeners.entrySet()) {
assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); if (listener.getValue() <= globalCheckpoint) {
// only listeners waiting on a lower global checkpoint will have been notified
assertThat(notifiedListeners.get(listener.getKey()), equalTo(globalCheckpoint));
} else {
assertNull(notifiedListeners.get(listener.getKey()));
}
} }
// test the listeners are not invoked twice // test the listeners are not invoked twice
final long nextGlobalCheckpoint = randomLongBetween(globalCheckpoint + 1, Long.MAX_VALUE); notifiedListeners.clear();
final long nextGlobalCheckpoint = randomLongBetween(1 + globalCheckpoint, Long.MAX_VALUE);
globalCheckpointListeners.globalCheckpointUpdated(nextGlobalCheckpoint); globalCheckpointListeners.globalCheckpointUpdated(nextGlobalCheckpoint);
for (int i = 0; i < numberOfListeners; i++) { for (final Map.Entry<GlobalCheckpointListeners.GlobalCheckpointListener, Long> listener : listeners.entrySet()) {
assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); if (listener.getValue() > globalCheckpoint && listener.getValue() <= nextGlobalCheckpoint) {
// these listeners will have been notified by the second global checkpoint update, and all the other listeners should not be
assertThat(notifiedListeners.get(listener.getKey()), equalTo(nextGlobalCheckpoint));
} else {
assertNull(notifiedListeners.get(listener.getKey()));
}
} }
// closing should also not notify the listeners // closing should also not notify the listeners
notifiedListeners.clear();
globalCheckpointListeners.close(); globalCheckpointListeners.close();
for (int i = 0; i < numberOfListeners; i++) { for (final Map.Entry<GlobalCheckpointListeners.GlobalCheckpointListener, Long> listener : listeners.entrySet()) {
assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); if (listener.getValue() > nextGlobalCheckpoint) {
// these listeners should have been notified that we closed, and all the other listeners should not be
assertThat(notifiedListeners.get(listener.getKey()), equalTo(UNASSIGNED_SEQ_NO));
} else {
assertNull(notifiedListeners.get(listener.getKey()));
}
} }
} }
public void testListenersReadyToBeNotified() throws IOException { public void testListenersReadyToBeNotified() throws IOException {
final GlobalCheckpointListeners globalCheckpointListeners = final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); final long globalCheckpoint = randomLongBetween(0, Long.MAX_VALUE);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
final int numberOfListeners = randomIntBetween(0, 16); final int numberOfListeners = randomIntBetween(0, 16);
final long[] globalCheckpoints = new long[numberOfListeners]; final long[] globalCheckpoints = new long[numberOfListeners];
for (int i = 0; i < numberOfListeners; i++) { for (int i = 0; i < numberOfListeners; i++) {
final int index = i; final int index = i;
final AtomicBoolean invoked = new AtomicBoolean(); globalCheckpointListeners.add(
final GlobalCheckpointListeners.GlobalCheckpointListener listener = randomLongBetween(0, globalCheckpoint),
(g, e) -> { maybeMultipleInvocationProtectingListener((g, e) -> globalCheckpoints[index] = g),
if (invoked.compareAndSet(false, true) == false) { null);
throw new IllegalStateException("listener invoked twice");
}
assert g != UNASSIGNED_SEQ_NO;
assert e == null;
globalCheckpoints[index] = g;
};
globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener, null);
// the listener should be notified immediately // the listener should be notified immediately
assertThat(globalCheckpoints[index], equalTo(globalCheckpoint)); assertThat(globalCheckpoints[index], equalTo(globalCheckpoint));
} }
@ -161,18 +172,17 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
for (int i = 0; i < numberOfListeners; i++) { for (int i = 0; i < numberOfListeners; i++) {
final int index = i; final int index = i;
final boolean failure = randomBoolean(); final boolean failure = randomBoolean();
final GlobalCheckpointListeners.GlobalCheckpointListener listener = globalCheckpointListeners.add(
(g, e) -> { randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1),
assert globalCheckpoint != UNASSIGNED_SEQ_NO; maybeMultipleInvocationProtectingListener((g, e) -> {
assert e == null;
if (failure) { if (failure) {
globalCheckpoints[index] = Long.MIN_VALUE; globalCheckpoints[index] = Long.MIN_VALUE;
throw new RuntimeException("failure"); throw new RuntimeException("failure");
} else { } else {
globalCheckpoints[index] = globalCheckpoint; globalCheckpoints[index] = globalCheckpoint;
} }
}; }),
globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener, null); null);
// the listener should be notified immediately // the listener should be notified immediately
if (failure) { if (failure) {
assertThat(globalCheckpoints[i], equalTo(Long.MIN_VALUE)); assertThat(globalCheckpoints[i], equalTo(Long.MIN_VALUE));
@ -202,17 +212,8 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
final Exception[] exceptions = new Exception[numberOfListeners]; final Exception[] exceptions = new Exception[numberOfListeners];
for (int i = 0; i < numberOfListeners; i++) { for (int i = 0; i < numberOfListeners; i++) {
final int index = i; final int index = i;
final AtomicBoolean invoked = new AtomicBoolean(); globalCheckpointListeners.add(
final GlobalCheckpointListeners.GlobalCheckpointListener listener = 0, maybeMultipleInvocationProtectingListener((g, e) -> exceptions[index] = e), null);
(globalCheckpoint, e) -> {
if (invoked.compareAndSet(false, true) == false) {
throw new IllegalStateException("listener invoked twice");
}
assert globalCheckpoint == UNASSIGNED_SEQ_NO;
assert e != null;
exceptions[index] = e;
};
globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null);
} }
globalCheckpointListeners.close(); globalCheckpointListeners.close();
for (int i = 0; i < numberOfListeners; i++) { for (int i = 0; i < numberOfListeners; i++) {
@ -238,16 +239,13 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
globalCheckpointListeners.close(); globalCheckpointListeners.close();
final AtomicBoolean invoked = new AtomicBoolean(); final AtomicBoolean invoked = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final GlobalCheckpointListeners.GlobalCheckpointListener listener = (g, e) -> { globalCheckpointListeners.add(
assert g == UNASSIGNED_SEQ_NO; randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE),
assert e != null; maybeMultipleInvocationProtectingListener((g, e) -> {
if (invoked.compareAndSet(false, true) == false) { invoked.set(true);
latch.countDown(); latch.countDown();
throw new IllegalStateException("listener invoked twice"); }),
} null);
latch.countDown();
};
globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE), listener, null);
latch.await(); latch.await();
assertTrue(invoked.get()); assertTrue(invoked.get());
} }
@ -264,18 +262,17 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
final int index = i; final int index = i;
final boolean failure = randomBoolean(); final boolean failure = randomBoolean();
failures[index] = failure; failures[index] = failure;
final GlobalCheckpointListeners.GlobalCheckpointListener listener = globalCheckpointListeners.add(
(g, e) -> { 0,
assert g != UNASSIGNED_SEQ_NO; maybeMultipleInvocationProtectingListener((g, e) -> {
assert e == null;
if (failure) { if (failure) {
globalCheckpoints[index] = Long.MIN_VALUE; globalCheckpoints[index] = Long.MIN_VALUE;
throw new RuntimeException("failure"); throw new RuntimeException("failure");
} else { } else {
globalCheckpoints[index] = g; globalCheckpoints[index] = g;
} }
}; }),
globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); null);
} }
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
@ -319,17 +316,16 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
final int index = i; final int index = i;
final boolean failure = randomBoolean(); final boolean failure = randomBoolean();
failures[index] = failure; failures[index] = failure;
final GlobalCheckpointListeners.GlobalCheckpointListener listener = globalCheckpointListeners.add(
(g, e) -> { 0,
assert g == UNASSIGNED_SEQ_NO; maybeMultipleInvocationProtectingListener((g, e) -> {
assert e != null;
if (failure) { if (failure) {
throw new RuntimeException("failure"); throw new RuntimeException("failure");
} else { } else {
exceptions[index] = e; exceptions[index] = e;
} }
}; }),
globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); null);
} }
globalCheckpointListeners.close(); globalCheckpointListeners.close();
for (int i = 0; i < numberOfListeners; i++) { for (int i = 0; i < numberOfListeners; i++) {
@ -370,12 +366,12 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
final int numberOfListeners = randomIntBetween(0, 16); final int numberOfListeners = randomIntBetween(0, 16);
for (int i = 0; i < numberOfListeners; i++) { for (int i = 0; i < numberOfListeners; i++) {
globalCheckpointListeners.add( globalCheckpointListeners.add(
NO_OPS_PERFORMED, 0,
(g, e) -> { maybeMultipleInvocationProtectingListener((g, e) -> {
notified.incrementAndGet(); notified.incrementAndGet();
assertThat(g, equalTo(globalCheckpoint)); assertThat(g, equalTo(globalCheckpoint));
assertNull(e); assertNull(e);
}, }),
null); null);
} }
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
@ -396,13 +392,13 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
for (int i = 0; i < numberOfListeners; i++) { for (int i = 0; i < numberOfListeners; i++) {
globalCheckpointListeners.add( globalCheckpointListeners.add(
NO_OPS_PERFORMED, NO_OPS_PERFORMED,
(g, e) -> { maybeMultipleInvocationProtectingListener((g, e) -> {
notified.incrementAndGet(); notified.incrementAndGet();
assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertNotNull(e); assertNotNull(e);
assertThat(e, instanceOf(IndexShardClosedException.class)); assertThat(e, instanceOf(IndexShardClosedException.class));
assertThat(((IndexShardClosedException) e).getShardId(), equalTo(shardId)); assertThat(((IndexShardClosedException) e).getShardId(), equalTo(shardId));
}, }),
null); null);
} }
assertThat(notified.get(), equalTo(numberOfListeners)); assertThat(notified.get(), equalTo(numberOfListeners));
@ -423,11 +419,12 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
for (int i = 0; i < numberOfListeners; i++) { for (int i = 0; i < numberOfListeners; i++) {
globalCheckpointListeners.add( globalCheckpointListeners.add(
randomLongBetween(0, globalCheckpoint), randomLongBetween(0, globalCheckpoint),
(g, e) -> { maybeMultipleInvocationProtectingListener((g, e) -> {
notified.incrementAndGet(); notified.incrementAndGet();
assertThat(g, equalTo(globalCheckpoint)); assertThat(g, equalTo(globalCheckpoint));
assertNull(e); assertNull(e);
}, null); }),
null);
} }
assertThat(notified.get(), equalTo(numberOfListeners)); assertThat(notified.get(), equalTo(numberOfListeners));
assertThat(count.get(), equalTo(numberOfListeners)); assertThat(count.get(), equalTo(numberOfListeners));
@ -472,11 +469,11 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
// sometimes this will notify the listener immediately // sometimes this will notify the listener immediately
globalCheckpointListeners.add( globalCheckpointListeners.add(
globalCheckpoint.get(), globalCheckpoint.get(),
(g, e) -> { maybeMultipleInvocationProtectingListener((g, e) -> {
if (invocation.compareAndSet(false, true) == false) { if (invocation.compareAndSet(false, true) == false) {
throw new IllegalStateException("listener invoked twice"); throw new IllegalStateException("listener invoked twice");
} }
}, }),
randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1)))); randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1))));
} }
// synchronize ending with the updating thread and the main test thread // synchronize ending with the updating thread and the main test thread
@ -511,7 +508,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
globalCheckpointListeners.add( globalCheckpointListeners.add(
NO_OPS_PERFORMED, NO_OPS_PERFORMED,
(g, e) -> { maybeMultipleInvocationProtectingListener((g, e) -> {
try { try {
notified.set(true); notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
@ -527,7 +524,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
} finally { } finally {
latch.countDown(); latch.countDown();
} }
}, }),
timeout); timeout);
latch.await(); latch.await();
@ -546,7 +543,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
globalCheckpointListeners.add( globalCheckpointListeners.add(
NO_OPS_PERFORMED, NO_OPS_PERFORMED,
(g, e) -> { maybeMultipleInvocationProtectingListener((g, e) -> {
try { try {
notified.set(true); notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
@ -554,7 +551,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
} finally { } finally {
latch.countDown(); latch.countDown();
} }
}, }),
timeout); timeout);
latch.await(); latch.await();
// ensure the listener notification occurred on the executor // ensure the listener notification occurred on the executor
@ -574,9 +571,9 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
globalCheckpointListeners.add( globalCheckpointListeners.add(
NO_OPS_PERFORMED, NO_OPS_PERFORMED,
(g, e) -> { maybeMultipleInvocationProtectingListener((g, e) -> {
throw new RuntimeException("failure"); throw new RuntimeException("failure");
}, }),
timeout); timeout);
latch.await(); latch.await();
final ArgumentCaptor<String> message = ArgumentCaptor.forClass(String.class); final ArgumentCaptor<String> message = ArgumentCaptor.forClass(String.class);
@ -592,10 +589,11 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
final GlobalCheckpointListeners globalCheckpointListeners = final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
final TimeValue timeout = TimeValue.timeValueNanos(Long.MAX_VALUE); final TimeValue timeout = TimeValue.timeValueNanos(Long.MAX_VALUE);
final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener = (g, e) -> { final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener =
maybeMultipleInvocationProtectingListener((g, e) -> {
assertThat(g, equalTo(NO_OPS_PERFORMED)); assertThat(g, equalTo(NO_OPS_PERFORMED));
assertNull(e); assertNull(e);
}; });
globalCheckpointListeners.add(NO_OPS_PERFORMED, globalCheckpointListener, timeout); globalCheckpointListeners.add(NO_OPS_PERFORMED, globalCheckpointListener, timeout);
final ScheduledFuture<?> future = globalCheckpointListeners.getTimeoutFuture(globalCheckpointListener); final ScheduledFuture<?> future = globalCheckpointListeners.getTimeoutFuture(globalCheckpointListener);
assertNotNull(future); assertNotNull(future);
@ -603,6 +601,21 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
assertTrue(future.isCancelled()); assertTrue(future.isCancelled());
} }
private GlobalCheckpointListeners.GlobalCheckpointListener maybeMultipleInvocationProtectingListener(
final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener) {
if (Assertions.ENABLED) {
final AtomicBoolean invoked = new AtomicBoolean();
return (g, e) -> {
if (invoked.compareAndSet(false, true) == false) {
throw new AssertionError("listener invoked twice");
}
globalCheckpointListener.accept(g, e);
};
} else {
return globalCheckpointListener;
}
}
private void awaitQuietly(final CyclicBarrier barrier) { private void awaitQuietly(final CyclicBarrier barrier) {
try { try {
barrier.await(); barrier.await();

View File

@ -747,7 +747,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
final int index = i; final int index = i;
final AtomicLong globalCheckpoint = new AtomicLong(); final AtomicLong globalCheckpoint = new AtomicLong();
shard.addGlobalCheckpointListener( shard.addGlobalCheckpointListener(
i - 1, i,
(g, e) -> { (g, e) -> {
assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED)); assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED));
assertNull(e); assertNull(e);
@ -759,7 +759,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
// adding a listener expecting a lower global checkpoint should fire immediately // adding a listener expecting a lower global checkpoint should fire immediately
final AtomicLong immediateGlobalCheckpint = new AtomicLong(); final AtomicLong immediateGlobalCheckpint = new AtomicLong();
shard.addGlobalCheckpointListener( shard.addGlobalCheckpointListener(
randomLongBetween(NO_OPS_PERFORMED, i - 1), randomLongBetween(0, i),
(g, e) -> { (g, e) -> {
assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED)); assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED));
assertNull(e); assertNull(e);
@ -770,7 +770,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
} }
final AtomicBoolean invoked = new AtomicBoolean(); final AtomicBoolean invoked = new AtomicBoolean();
shard.addGlobalCheckpointListener( shard.addGlobalCheckpointListener(
numberOfUpdates - 1, numberOfUpdates,
(g, e) -> { (g, e) -> {
invoked.set(true); invoked.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
@ -792,7 +792,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
shard.addGlobalCheckpointListener( shard.addGlobalCheckpointListener(
NO_OPS_PERFORMED, 0,
(g, e) -> { (g, e) -> {
try { try {
notified.set(true); notified.set(true);