Cleanup assertions in global checkpoint listeners (#33722)

This commit is a cleanup of the assertions in global checkpoint
listeners, simplifying them and adding some messages to them in case the
assertions trip.
This commit is contained in:
Jason Tedor 2018-09-14 14:45:58 -04:00 committed by GitHub
parent 82a6ae1dae
commit a0f0d7860e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 20 additions and 3 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.shard;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Assertions;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
@ -150,6 +151,9 @@ public class GlobalCheckpointListeners implements Closeable {
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if (closed) {
assert listeners.isEmpty() : listeners;
}
closed = true; closed = true;
notifyListeners(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)); notifyListeners(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId));
} }
@ -188,8 +192,8 @@ public class GlobalCheckpointListeners implements Closeable {
} }
private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) { private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) {
assert Thread.holdsLock(this); assert Thread.holdsLock(this) : Thread.currentThread();
assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null); assertNotification(globalCheckpoint, e);
final Map<GlobalCheckpointListener, Tuple<Long, ScheduledFuture<?>>> listenersToNotify; final Map<GlobalCheckpointListener, Tuple<Long, ScheduledFuture<?>>> listenersToNotify;
if (globalCheckpoint != UNASSIGNED_SEQ_NO) { if (globalCheckpoint != UNASSIGNED_SEQ_NO) {
@ -219,6 +223,8 @@ public class GlobalCheckpointListeners implements Closeable {
} }
private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final Exception e) { private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final Exception e) {
assertNotification(globalCheckpoint, e);
try { try {
listener.accept(globalCheckpoint, e); listener.accept(globalCheckpoint, e);
} catch (final Exception caught) { } catch (final Exception caught) {
@ -231,10 +237,21 @@ public class GlobalCheckpointListeners implements Closeable {
} else if (e instanceof IndexShardClosedException) { } else if (e instanceof IndexShardClosedException) {
logger.warn("error notifying global checkpoint listener of closed shard", caught); logger.warn("error notifying global checkpoint listener of closed shard", caught);
} else { } else {
assert e instanceof TimeoutException : e;
logger.warn("error notifying global checkpoint listener of timeout", caught); logger.warn("error notifying global checkpoint listener of timeout", caught);
} }
} }
} }
private void assertNotification(final long globalCheckpoint, final Exception e) {
if (Assertions.ENABLED) {
assert globalCheckpoint >= UNASSIGNED_SEQ_NO : globalCheckpoint;
if (globalCheckpoint != UNASSIGNED_SEQ_NO) {
assert e == null : e;
} else {
assert e != null;
assert e instanceof IndexShardClosedException || e instanceof TimeoutException : e;
}
}
}
} }