diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index bedd1654449..eb9e36eeec0 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Assertions; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.FutureUtils; @@ -150,6 +151,9 @@ public class GlobalCheckpointListeners implements Closeable { @Override public synchronized void close() throws IOException { + if (closed) { + assert listeners.isEmpty() : listeners; + } closed = true; notifyListeners(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)); } @@ -188,8 +192,8 @@ public class GlobalCheckpointListeners implements Closeable { } private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) { - assert Thread.holdsLock(this); - assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null); + assert Thread.holdsLock(this) : Thread.currentThread(); + assertNotification(globalCheckpoint, e); final Map>> listenersToNotify; if (globalCheckpoint != UNASSIGNED_SEQ_NO) { @@ -219,6 +223,8 @@ public class GlobalCheckpointListeners implements Closeable { } private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final Exception e) { + assertNotification(globalCheckpoint, e); + try { listener.accept(globalCheckpoint, e); } catch (final Exception caught) { @@ -231,10 +237,21 @@ public class GlobalCheckpointListeners implements Closeable { } else if (e instanceof IndexShardClosedException) { logger.warn("error notifying global checkpoint listener of closed shard", caught); } else { - assert e instanceof TimeoutException : e; logger.warn("error notifying global checkpoint listener of timeout", caught); } } } + private void assertNotification(final long globalCheckpoint, final Exception e) { + if (Assertions.ENABLED) { + assert globalCheckpoint >= UNASSIGNED_SEQ_NO : globalCheckpoint; + if (globalCheckpoint != UNASSIGNED_SEQ_NO) { + assert e == null : e; + } else { + assert e != null; + assert e instanceof IndexShardClosedException || e instanceof TimeoutException : e; + } + } + } + }