Enable global checkpoint listeners to timeout (#33620)

In cross-cluster replication, we will use global checkpoint listeners to
long poll for updates to a shard. However, we do not want these polls to
wait indefinitely as it could be difficult to discern if the listener is
still waiting for updates versus something has gone horribly wrong and
cross-cluster replication is stuck. Instead, we want these listeners to
timeout after some period (for example, one minute) so that they are
notified and we can update status on the following side that
cross-cluster replication is still active. After this, we will
immediately enter back into a poll mode.

To do this, we need the ability to associate a timeout with a global
checkpoint listener. This commit adds this capability.
This commit is contained in:
Jason Tedor 2018-09-12 10:53:22 -04:00 committed by GitHub
parent bcac7f5e55
commit 36ba3cda7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 371 additions and 80 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import java.util.concurrent.ExecutionException;
@ -30,8 +31,14 @@ import java.util.concurrent.TimeoutException;
public class FutureUtils {
/**
* Cancel execution of this future without interrupting a running thread. See {@link Future#cancel(boolean)} for details.
*
* @param toCancel the future to cancel
* @return false if the future could not be cancelled, otherwise true
*/
@SuppressForbidden(reason = "Future#cancel()")
public static boolean cancel(Future<?> toCancel) {
public static boolean cancel(@Nullable final Future<?> toCancel) {
if (toCancel != null) {
return toCancel.cancel(false); // this method is a forbidden API since it interrupts threads
}

View File

@ -21,13 +21,19 @@ package org.elasticsearch.index.shard;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
@ -45,38 +51,43 @@ public class GlobalCheckpointListeners implements Closeable {
public interface GlobalCheckpointListener {
/**
* Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint
* will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null. If the
* global checkpoint is updated, the exception will be null.
* will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null and an
* instance of {@link IndexShardClosedException }. If the listener timed out waiting for notification then the exception will be
* non-null and an instance of {@link TimeoutException}. If the global checkpoint is updated, the exception will be null.
*
* @param globalCheckpoint the updated global checkpoint
* @param e if non-null, the shard is closed
* @param e if non-null, the shard is closed or the listener timed out
*/
void accept(long globalCheckpoint, IndexShardClosedException e);
void accept(long globalCheckpoint, Exception e);
}
// guarded by this
private boolean closed;
private volatile List<GlobalCheckpointListener> listeners;
private volatile Map<GlobalCheckpointListener, ScheduledFuture<?>> listeners;
private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO;
private final ShardId shardId;
private final Executor executor;
private final ScheduledExecutorService scheduler;
private final Logger logger;
/**
* Construct a global checkpoint listeners collection.
*
* @param shardId the shard ID on which global checkpoint updates can be listened to
* @param executor the executor for listener notifications
* @param logger a shard-level logger
* @param shardId the shard ID on which global checkpoint updates can be listened to
* @param executor the executor for listener notifications
* @param scheduler the executor used for scheduling timeouts
* @param logger a shard-level logger
*/
GlobalCheckpointListeners(
final ShardId shardId,
final Executor executor,
final ScheduledExecutorService scheduler,
final Logger logger) {
this.shardId = Objects.requireNonNull(shardId);
this.executor = Objects.requireNonNull(executor);
this.logger = Objects.requireNonNull(logger);
this.shardId = Objects.requireNonNull(shardId, "shardId");
this.executor = Objects.requireNonNull(executor, "executor");
this.scheduler = Objects.requireNonNull(scheduler, "scheduler");
this.logger = Objects.requireNonNull(logger, "logger");
}
/**
@ -84,12 +95,15 @@ public class GlobalCheckpointListeners implements Closeable {
* listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
* checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard
* is closed. A listener must re-register after one of these events to receive subsequent events.
* is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be
* notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for
* the timeout means no timeout will be associated to the listener.
*
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener
* @param timeout the listener timeout, or null if no timeout
*/
synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) {
synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) {
if (closed) {
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)));
return;
@ -99,9 +113,41 @@ public class GlobalCheckpointListeners implements Closeable {
executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null));
} else {
if (listeners == null) {
listeners = new ArrayList<>();
listeners = new LinkedHashMap<>();
}
if (timeout == null) {
listeners.put(listener, null);
} else {
listeners.put(
listener,
scheduler.schedule(
() -> {
final boolean removed;
synchronized (this) {
/*
* Note that the listeners map can be null if a notification nulled out the map reference when
* notifying listeners, and then our scheduled execution occurred before we could be cancelled by
* the notification. In this case, we would have blocked waiting for access to this critical
* section.
*
* What is more, we know that this listener has a timeout associated with it (otherwise we would
* not be here) so the return value from remove being null is an indication that we are not in the
* map. This can happen if a notification nulled out the listeners, and then our scheduled execution
* occurred before we could be cancelled by the notification, and then another thread added a
* listener causing the listeners map reference to be non-null again. In this case, our listener
* here would not be in the map and we should not fire the timeout logic.
*/
removed = listeners != null && listeners.remove(listener) != null;
}
if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep());
logger.trace("global checkpoint listener timed out", e);
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e));
}
},
timeout.nanos(),
TimeUnit.NANOSECONDS));
}
listeners.add(listener);
}
}
@ -111,10 +157,25 @@ public class GlobalCheckpointListeners implements Closeable {
notifyListeners(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId));
}
/**
* The number of listeners currently pending for notification.
*
* @return the number of listeners pending notification
*/
synchronized int pendingListeners() {
return listeners == null ? 0 : listeners.size();
}
/**
* The scheduled future for a listener that has a timeout associated with it, otherwise null.
*
* @param listener the listener to get the scheduled future for
* @return a scheduled future representing the timeout future for the listener, otherwise null
*/
synchronized ScheduledFuture<?> getTimeoutFuture(final GlobalCheckpointListener listener) {
return listeners.get(listener);
}
/**
* Invoke to notify all registered listeners of an updated global checkpoint.
*
@ -134,19 +195,24 @@ public class GlobalCheckpointListeners implements Closeable {
assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null);
if (listeners != null) {
// capture the current listeners
final List<GlobalCheckpointListener> currentListeners = listeners;
final Map<GlobalCheckpointListener, ScheduledFuture<?>> currentListeners = listeners;
listeners = null;
if (currentListeners != null) {
executor.execute(() -> {
for (final GlobalCheckpointListener listener : currentListeners) {
notifyListener(listener, globalCheckpoint, e);
for (final Map.Entry<GlobalCheckpointListener, ScheduledFuture<?>> listener : currentListeners.entrySet()) {
/*
* We do not want to interrupt any timeouts that fired, these will detect that the listener has been notified and
* not trigger the timeout.
*/
FutureUtils.cancel(listener.getValue());
notifyListener(listener.getKey(), globalCheckpoint, e);
}
});
}
}
}
private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final IndexShardClosedException e) {
private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final Exception e) {
try {
listener.accept(globalCheckpoint, e);
} catch (final Exception caught) {
@ -156,8 +222,11 @@ public class GlobalCheckpointListeners implements Closeable {
"error notifying global checkpoint listener of updated global checkpoint [{}]",
globalCheckpoint),
caught);
} else {
} else if (e instanceof IndexShardClosedException) {
logger.warn("error notifying global checkpoint listener of closed shard", caught);
} else {
assert e instanceof TimeoutException : e;
logger.warn("error notifying global checkpoint listener of timeout", caught);
}
}
}

View File

@ -302,7 +302,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
final String aId = shardRouting.allocationId().getId();
this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), logger);
this.globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger);
this.replicationTracker =
new ReplicationTracker(shardId, aId, indexSettings, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated);
@ -1781,15 +1782,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
* listener will fire immediately on the calling thread.
* listener will fire immediately on the calling thread. If the specified timeout elapses before the listener is notified, the listener
* will be notified with an {@link TimeoutException}. A caller may pass null to specify no timeout.
*
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener
* @param timeout the timeout
*/
public void addGlobalCheckpointListener(
final long currentGlobalCheckpoint,
final GlobalCheckpointListeners.GlobalCheckpointListener listener) {
this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener);
final GlobalCheckpointListeners.GlobalCheckpointListener listener,
final TimeValue timeout) {
this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener, timeout);
}
/**

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.test.ESTestCase;
import java.util.concurrent.Future;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
public class FutureUtilsTests extends ESTestCase {
public void testCancellingNullFutureOkay() {
FutureUtils.cancel(null);
}
public void testRunningFutureNotInterrupted() {
final Future<?> future = mock(Future.class);
FutureUtils.cancel(future);
verify(future).cancel(false);
}
}

View File

@ -21,8 +21,12 @@ package org.elasticsearch.index.shard;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
@ -35,14 +39,20 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
@ -50,10 +60,18 @@ import static org.mockito.Mockito.verify;
public class GlobalCheckpointListenersTests extends ESTestCase {
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
private final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
private final ScheduledThreadPoolExecutor scheduler =
new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(Settings.EMPTY, "scheduler"));
@After
public void shutdownScheduler() {
scheduler.shutdown();
}
public void testGlobalCheckpointUpdated() throws IOException {
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger);
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
final int numberOfListeners = randomIntBetween(0, 16);
final long[] globalCheckpoints = new long[numberOfListeners];
@ -69,7 +87,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
assert e == null;
globalCheckpoints[index] = g;
};
globalCheckpointListeners.add(NO_OPS_PERFORMED, listener);
globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null);
}
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
@ -92,7 +110,8 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
}
public void testListenersReadyToBeNotified() throws IOException {
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger);
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
final int numberOfListeners = randomIntBetween(0, 16);
@ -109,7 +128,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
assert e == null;
globalCheckpoints[index] = g;
};
globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener);
globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener, null);
// the listener should be notified immediately
assertThat(globalCheckpoints[index], equalTo(globalCheckpoint));
}
@ -130,7 +149,8 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
public void testFailingListenerReadyToBeNotified() {
final Logger mockLogger = mock(Logger.class);
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger);
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
final int numberOfListeners = randomIntBetween(0, 16);
@ -149,7 +169,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
globalCheckpoints[index] = globalCheckpoint;
}
};
globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener);
globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener, null);
// the listener should be notified immediately
if (failure) {
assertThat(globalCheckpoints[i], equalTo(Long.MIN_VALUE));
@ -172,10 +192,11 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
}
public void testClose() throws IOException {
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger);
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
final int numberOfListeners = randomIntBetween(0, 16);
final IndexShardClosedException[] exceptions = new IndexShardClosedException[numberOfListeners];
final Exception[] exceptions = new Exception[numberOfListeners];
for (int i = 0; i < numberOfListeners; i++) {
final int index = i;
final AtomicBoolean invoked = new AtomicBoolean();
@ -188,12 +209,13 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
assert e != null;
exceptions[index] = e;
};
globalCheckpointListeners.add(NO_OPS_PERFORMED, listener);
globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null);
}
globalCheckpointListeners.close();
for (int i = 0; i < numberOfListeners; i++) {
assertNotNull(exceptions[i]);
assertThat(exceptions[i].getShardId(), equalTo(shardId));
assertThat(exceptions[i], instanceOf(IndexShardClosedException.class));
assertThat(((IndexShardClosedException)exceptions[i]).getShardId(), equalTo(shardId));
}
// test the listeners are not invoked twice
@ -207,7 +229,8 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
}
public void testAddAfterClose() throws InterruptedException, IOException {
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger);
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
globalCheckpointListeners.close();
final AtomicBoolean invoked = new AtomicBoolean();
@ -221,14 +244,15 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
}
latch.countDown();
};
globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE), listener);
globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE), listener, null);
latch.await();
assertTrue(invoked.get());
}
public void testFailingListenerOnUpdate() {
final Logger mockLogger = mock(Logger.class);
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger);
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
final int numberOfListeners = randomIntBetween(0, 16);
final boolean[] failures = new boolean[numberOfListeners];
@ -248,7 +272,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
globalCheckpoints[index] = g;
}
};
globalCheckpointListeners.add(NO_OPS_PERFORMED, listener);
globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null);
}
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
@ -282,11 +306,12 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
public void testFailingListenerOnClose() throws IOException {
final Logger mockLogger = mock(Logger.class);
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger);
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
final int numberOfListeners = randomIntBetween(0, 16);
final boolean[] failures = new boolean[numberOfListeners];
final IndexShardClosedException[] exceptions = new IndexShardClosedException[numberOfListeners];
final Exception[] exceptions = new Exception[numberOfListeners];
for (int i = 0; i < numberOfListeners; i++) {
final int index = i;
final boolean failure = randomBoolean();
@ -301,7 +326,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
exceptions[index] = e;
}
};
globalCheckpointListeners.add(NO_OPS_PERFORMED, listener);
globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null);
}
globalCheckpointListeners.close();
for (int i = 0; i < numberOfListeners; i++) {
@ -309,7 +334,8 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
assertNull(exceptions[i]);
} else {
assertNotNull(exceptions[i]);
assertThat(exceptions[i].getShardId(), equalTo(shardId));
assertThat(exceptions[i], instanceOf(IndexShardClosedException.class));
assertThat(((IndexShardClosedException)exceptions[i]).getShardId(), equalTo(shardId));
}
}
int failureCount = 0;
@ -334,17 +360,20 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
count.incrementAndGet();
command.run();
};
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger);
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE);
final AtomicInteger notified = new AtomicInteger();
final int numberOfListeners = randomIntBetween(0, 16);
for (int i = 0; i < numberOfListeners; i++) {
globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> {
notified.incrementAndGet();
assertThat(g, equalTo(globalCheckpoint));
assertNull(e);
});
globalCheckpointListeners.add(
NO_OPS_PERFORMED,
(g, e) -> {
notified.incrementAndGet();
assertThat(g, equalTo(globalCheckpoint));
assertNull(e);
},
null);
}
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
assertThat(notified.get(), equalTo(numberOfListeners));
@ -357,17 +386,21 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
count.incrementAndGet();
command.run();
};
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger);
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
globalCheckpointListeners.close();
final AtomicInteger notified = new AtomicInteger();
final int numberOfListeners = randomIntBetween(0, 16);
for (int i = 0; i < numberOfListeners; i++) {
globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> {
notified.incrementAndGet();
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertNotNull(e);
assertThat(e.getShardId(), equalTo(shardId));
});
globalCheckpointListeners.add(
NO_OPS_PERFORMED,
(g, e) -> {
notified.incrementAndGet();
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertNotNull(e);
assertThat(e, instanceOf(IndexShardClosedException.class));
assertThat(((IndexShardClosedException) e).getShardId(), equalTo(shardId));
},
null);
}
assertThat(notified.get(), equalTo(numberOfListeners));
assertThat(count.get(), equalTo(numberOfListeners));
@ -379,17 +412,19 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
count.incrementAndGet();
command.run();
};
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger);
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
final long globalCheckpoint = randomNonNegativeLong();
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
final AtomicInteger notified = new AtomicInteger();
final int numberOfListeners = randomIntBetween(0, 16);
for (int i = 0; i < numberOfListeners; i++) {
globalCheckpointListeners.add(randomLongBetween(0, globalCheckpoint), (g, e) -> {
notified.incrementAndGet();
assertThat(g, equalTo(globalCheckpoint));
assertNull(e);
});
globalCheckpointListeners.add(
randomLongBetween(0, globalCheckpoint),
(g, e) -> {
notified.incrementAndGet();
assertThat(g, equalTo(globalCheckpoint));
assertNull(e);
}, null);
}
assertThat(notified.get(), equalTo(numberOfListeners));
assertThat(count.get(), equalTo(numberOfListeners));
@ -397,18 +432,18 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
public void testConcurrency() throws BrokenBarrierException, InterruptedException {
final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, 8));
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger);
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get());
// we are going to synchronize the actions of three threads: the updating thread, the listener thread, and the main test thread
final CyclicBarrier barrier = new CyclicBarrier(3);
final int numberOfIterations = randomIntBetween(1, 1024);
final int numberOfIterations = randomIntBetween(1, 4096);
final AtomicBoolean closed = new AtomicBoolean();
final Thread updatingThread = new Thread(() -> {
// synchronize starting with the listener thread and the main test thread
awaitQuietly(barrier);
for (int i = 0; i < numberOfIterations; i++) {
if (rarely() && closed.get() == false) {
if (i > numberOfIterations / 2 && rarely() && closed.get() == false) {
closed.set(true);
try {
globalCheckpointListeners.close();
@ -416,7 +451,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
throw new UncheckedIOException(e);
}
}
if (closed.get() == false) {
if (rarely() && closed.get() == false) {
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet());
}
}
@ -438,7 +473,8 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
if (invocation.compareAndSet(false, true) == false) {
throw new IllegalStateException("listener invoked twice");
}
});
},
randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1))));
}
// synchronize ending with the updating thread and the main test thread
awaitQuietly(barrier);
@ -463,6 +499,107 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
listenersThread.join();
}
public void testTimeout() throws InterruptedException {
final Logger mockLogger = mock(Logger.class);
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
final AtomicBoolean notified = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
globalCheckpointListeners.add(
NO_OPS_PERFORMED,
(g, e) -> {
try {
notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertThat(e, instanceOf(TimeoutException.class));
assertThat(e, hasToString(containsString(timeout.getStringRep())));
final ArgumentCaptor<String> message = ArgumentCaptor.forClass(String.class);
final ArgumentCaptor<TimeoutException> t = ArgumentCaptor.forClass(TimeoutException.class);
verify(mockLogger).trace(message.capture(), t.capture());
assertThat(message.getValue(), equalTo("global checkpoint listener timed out"));
assertThat(t.getValue(), hasToString(containsString(timeout.getStringRep())));
} catch (Exception caught) {
fail(e.getMessage());
} finally {
latch.countDown();
}
},
timeout);
latch.await();
assertTrue(notified.get());
}
public void testTimeoutNotificationUsesExecutor() throws InterruptedException {
final AtomicInteger count = new AtomicInteger();
final Executor executor = command -> {
count.incrementAndGet();
command.run();
};
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
final AtomicBoolean notified = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
globalCheckpointListeners.add(
NO_OPS_PERFORMED,
(g, e) -> {
try {
notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertThat(e, instanceOf(TimeoutException.class));
} finally {
latch.countDown();
}
},
timeout);
latch.await();
// ensure the listener notification occurred on the executor
assertTrue(notified.get());
assertThat(count.get(), equalTo(1));
}
public void testFailingListenerAfterTimeout() throws InterruptedException {
final Logger mockLogger = mock(Logger.class);
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
final CountDownLatch latch = new CountDownLatch(1);
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
globalCheckpointListeners.add(
NO_OPS_PERFORMED,
(g, e) -> {
try {
throw new RuntimeException("failure");
} finally {
latch.countDown();
}
},
timeout);
latch.await();
final ArgumentCaptor<String> message = ArgumentCaptor.forClass(String.class);
final ArgumentCaptor<RuntimeException> t = ArgumentCaptor.forClass(RuntimeException.class);
verify(mockLogger).warn(message.capture(), t.capture());
assertThat(message.getValue(), equalTo("error notifying global checkpoint listener of timeout"));
assertNotNull(t.getValue());
assertThat(t.getValue(), instanceOf(RuntimeException.class));
assertThat(t.getValue().getMessage(), equalTo("failure"));
}
public void testTimeoutCancelledAfterListenerNotified() {
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
final TimeValue timeout = TimeValue.timeValueNanos(Long.MAX_VALUE);
final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener = (g, e) -> {
assertThat(g, equalTo(NO_OPS_PERFORMED));
assertNull(e);
};
globalCheckpointListeners.add(NO_OPS_PERFORMED, globalCheckpointListener, timeout);
final ScheduledFuture<?> future = globalCheckpointListeners.getTimeoutFuture(globalCheckpointListener);
assertNotNull(future);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
assertTrue(future.isCancelled());
}
private void awaitQuietly(final CyclicBarrier barrier) {
try {
barrier.await();

View File

@ -89,6 +89,7 @@ import java.util.Locale;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -113,6 +114,8 @@ import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
public class IndexShardIT extends ESSingleNodeTestCase {
@ -746,10 +749,11 @@ public class IndexShardIT extends ESSingleNodeTestCase {
shard.addGlobalCheckpointListener(
i - 1,
(g, e) -> {
assert g >= NO_OPS_PERFORMED;
assert e == null;
assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED));
assertNull(e);
globalCheckpoint.set(g);
});
},
null);
client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(globalCheckpoint.get(), equalTo((long) index)));
// adding a listener expecting a lower global checkpoint should fire immediately
@ -757,10 +761,11 @@ public class IndexShardIT extends ESSingleNodeTestCase {
shard.addGlobalCheckpointListener(
randomLongBetween(NO_OPS_PERFORMED, i - 1),
(g, e) -> {
assert g >= NO_OPS_PERFORMED;
assert e == null;
assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED));
assertNull(e);
immediateGlobalCheckpint.set(g);
});
},
null);
assertBusy(() -> assertThat(immediateGlobalCheckpint.get(), equalTo((long) index)));
}
final AtomicBoolean invoked = new AtomicBoolean();
@ -768,12 +773,40 @@ public class IndexShardIT extends ESSingleNodeTestCase {
numberOfUpdates - 1,
(g, e) -> {
invoked.set(true);
assert g == UNASSIGNED_SEQ_NO;
assert e != null;
assertThat(e.getShardId(), equalTo(shard.shardId()));
});
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertThat(e, instanceOf(IndexShardClosedException.class));
assertThat(((IndexShardClosedException)e).getShardId(), equalTo(shard.shardId()));
},
null);
shard.close("closed", randomBoolean());
assertBusy(() -> assertTrue(invoked.get()));
}
public void testGlobalCheckpointListenerTimeout() throws InterruptedException {
createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build());
ensureGreen();
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexService test = indicesService.indexService(resolveIndex("test"));
final IndexShard shard = test.getShardOrNull(0);
final AtomicBoolean notified = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
shard.addGlobalCheckpointListener(
NO_OPS_PERFORMED,
(g, e) -> {
try {
notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertNotNull(e);
assertThat(e, instanceOf(TimeoutException.class));
assertThat(e.getMessage(), equalTo(timeout.getStringRep()));
} finally {
latch.countDown();
}
},
timeout);
latch.await();
assertTrue(notified.get());
}
}