Use given executor for global checkpoint listener (#53260)

Today when notifying a global checkpoint listener, we use the listener
thread pool. This commit turns this inside out so that the global
checkpoint listener must provide an executor on which to notify the
listener.
This commit is contained in:
Jason Tedor 2020-03-08 13:48:09 -04:00
parent 79b67eb3ba
commit 5e96d3e59a
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
6 changed files with 360 additions and 182 deletions

View File

@ -51,8 +51,15 @@ public class GlobalCheckpointListeners implements Closeable {
/**
* A global checkpoint listener consisting of a callback that is notified when the global checkpoint is updated or the shard is closed.
*/
@FunctionalInterface
public interface GlobalCheckpointListener {
/**
* The executor on which the listener is notified.
*
* @return the executor
*/
Executor executor();
/**
* Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint
* will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null and an
@ -63,6 +70,7 @@ public class GlobalCheckpointListeners implements Closeable {
* @param e if non-null, the shard is closed or the listener timed out
*/
void accept(long globalCheckpoint, Exception e);
}
// guarded by this
@ -71,7 +79,6 @@ public class GlobalCheckpointListeners implements Closeable {
private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO;
private final ShardId shardId;
private final Executor executor;
private final ScheduledExecutorService scheduler;
private final Logger logger;
@ -79,17 +86,14 @@ public class GlobalCheckpointListeners implements Closeable {
* Construct a global checkpoint listeners collection.
*
* @param shardId the shard ID on which global checkpoint updates can be listened to
* @param executor the executor for listener notifications
* @param scheduler the executor used for scheduling timeouts
* @param logger a shard-level logger
*/
GlobalCheckpointListeners(
final ShardId shardId,
final Executor executor,
final ScheduledExecutorService scheduler,
final Logger logger) {
final ShardId shardId,
final ScheduledExecutorService scheduler,
final Logger logger) {
this.shardId = Objects.requireNonNull(shardId, "shardId");
this.executor = Objects.requireNonNull(executor, "executor");
this.scheduler = Objects.requireNonNull(scheduler, "scheduler");
this.logger = Objects.requireNonNull(logger, "logger");
}
@ -109,12 +113,12 @@ public class GlobalCheckpointListeners implements Closeable {
*/
synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) {
if (closed) {
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)));
notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId));
return;
}
if (lastKnownGlobalCheckpoint >= waitingForGlobalCheckpoint) {
// notify directly
executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null));
notifyListener(listener, lastKnownGlobalCheckpoint, null);
} else {
if (timeout == null) {
listeners.put(listener, Tuple.tuple(waitingForGlobalCheckpoint, null));
@ -140,7 +144,7 @@ public class GlobalCheckpointListeners implements Closeable {
if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep());
logger.trace("global checkpoint listener timed out", e);
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e));
notifyListener(listener, UNASSIGNED_SEQ_NO, e);
}
},
timeout.nanos(),
@ -193,7 +197,6 @@ public class GlobalCheckpointListeners implements Closeable {
private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) {
assert Thread.holdsLock(this) : Thread.currentThread();
assertNotification(globalCheckpoint, e);
// early return if there are no listeners
if (listeners.isEmpty()) {
@ -214,37 +217,38 @@ public class GlobalCheckpointListeners implements Closeable {
listeners.clear();
}
if (listenersToNotify.isEmpty() == false) {
executor.execute(() ->
listenersToNotify
.forEach((listener, t) -> {
/*
* We do not want to interrupt any timeouts that fired, these will detect that the listener has been
* notified and not trigger the timeout.
*/
FutureUtils.cancel(t.v2());
notifyListener(listener, globalCheckpoint, e);
}));
listenersToNotify
.forEach((listener, t) -> {
/*
* We do not want to interrupt any timeouts that fired, these will detect that the listener has been notified and not
* trigger the timeout.
*/
FutureUtils.cancel(t.v2());
notifyListener(listener, globalCheckpoint, e);
});
}
}
private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final Exception e) {
assertNotification(globalCheckpoint, e);
try {
listener.accept(globalCheckpoint, e);
} catch (final Exception caught) {
if (globalCheckpoint != UNASSIGNED_SEQ_NO) {
logger.warn(
listener.executor().execute(() -> {
try {
listener.accept(globalCheckpoint, e);
} catch (final Exception caught) {
if (globalCheckpoint != UNASSIGNED_SEQ_NO) {
logger.warn(
new ParameterizedMessage(
"error notifying global checkpoint listener of updated global checkpoint [{}]",
globalCheckpoint),
"error notifying global checkpoint listener of updated global checkpoint [{}]",
globalCheckpoint),
caught);
} else if (e instanceof IndexShardClosedException) {
logger.warn("error notifying global checkpoint listener of closed shard", caught);
} else {
logger.warn("error notifying global checkpoint listener of timeout", caught);
} else if (e instanceof IndexShardClosedException) {
logger.warn("error notifying global checkpoint listener of closed shard", caught);
} else {
logger.warn("error notifying global checkpoint listener of timeout", caught);
}
}
}
});
}
private void assertNotification(final long globalCheckpoint, final Exception e) {

View File

@ -335,7 +335,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
this.pendingPrimaryTerm = primaryTerm;
this.globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger);
new GlobalCheckpointListeners(shardId, threadPool.scheduler(), logger);
this.replicationTracker = new ReplicationTracker(
shardId,
aId,

View File

@ -0,0 +1,170 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.junit.After;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
public class GlobalCheckpointListenersIT extends ESSingleNodeTestCase {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
@After
public void shutdownExecutor() {
executor.shutdown();
}
public void testGlobalCheckpointListeners() throws Exception {
createIndex("test", Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0).build());
ensureGreen();
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexService test = indicesService.indexService(resolveIndex("test"));
final IndexShard shard = test.getShardOrNull(0);
final int numberOfUpdates = randomIntBetween(1, 128);
for (int i = 0; i < numberOfUpdates; i++) {
final int index = i;
final AtomicLong globalCheckpoint = new AtomicLong();
shard.addGlobalCheckpointListener(
i,
new GlobalCheckpointListeners.GlobalCheckpointListener() {
@Override
public Executor executor() {
return executor;
}
@Override
public void accept(final long g, final Exception e) {
assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED));
assertNull(e);
globalCheckpoint.set(g);
}
},
null);
client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(globalCheckpoint.get(), equalTo((long) index)));
// adding a listener expecting a lower global checkpoint should fire immediately
final AtomicLong immediateGlobalCheckpint = new AtomicLong();
shard.addGlobalCheckpointListener(
randomLongBetween(0, i),
new GlobalCheckpointListeners.GlobalCheckpointListener() {
@Override
public Executor executor() {
return executor;
}
@Override
public void accept(final long g, final Exception e) {
assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED));
assertNull(e);
immediateGlobalCheckpint.set(g);
}
},
null);
assertBusy(() -> assertThat(immediateGlobalCheckpint.get(), equalTo((long) index)));
}
final AtomicBoolean invoked = new AtomicBoolean();
shard.addGlobalCheckpointListener(
numberOfUpdates,
new GlobalCheckpointListeners.GlobalCheckpointListener() {
@Override
public Executor executor() {
return executor;
}
@Override
public void accept(final long g, final Exception e) {
invoked.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertThat(e, instanceOf(IndexShardClosedException.class));
assertThat(((IndexShardClosedException)e).getShardId(), equalTo(shard.shardId()));
}
},
null);
shard.close("closed", randomBoolean());
assertBusy(() -> assertTrue(invoked.get()));
}
public void testGlobalCheckpointListenerTimeout() throws InterruptedException {
createIndex("test", Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0).build());
ensureGreen();
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexService test = indicesService.indexService(resolveIndex("test"));
final IndexShard shard = test.getShardOrNull(0);
final AtomicBoolean notified = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
shard.addGlobalCheckpointListener(
0,
new GlobalCheckpointListeners.GlobalCheckpointListener() {
@Override
public Executor executor() {
return executor;
}
@Override
public void accept(final long g, final Exception e) {
try {
notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertNotNull(e);
assertThat(e, instanceOf(TimeoutException.class));
assertThat(e.getMessage(), equalTo(timeout.getStringRep()));
} finally {
latch.countDown();
}
}
},
timeout);
latch.await();
assertTrue(notified.get());
}
}

View File

@ -67,6 +67,15 @@ import static org.mockito.Mockito.verify;
public class GlobalCheckpointListenersTests extends ESTestCase {
@FunctionalInterface
interface TestGlobalCheckpointListener extends GlobalCheckpointListeners.GlobalCheckpointListener {
default Executor executor() {
return Runnable::run;
}
}
private final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
private final ScheduledThreadPoolExecutor scheduler =
new Scheduler.SafeScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(Settings.EMPTY, "scheduler"));
@ -78,17 +87,19 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
public void testGlobalCheckpointUpdated() throws IOException {
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
new GlobalCheckpointListeners(shardId, scheduler, logger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
final int numberOfListeners = randomIntBetween(0, 64);
final Map<GlobalCheckpointListeners.GlobalCheckpointListener, Long> listeners = new HashMap<>();
final Map<GlobalCheckpointListeners.GlobalCheckpointListener, Long> notifiedListeners = new HashMap<>();
for (int i = 0; i < numberOfListeners; i++) {
final GlobalCheckpointListeners.GlobalCheckpointListener listener = new GlobalCheckpointListeners.GlobalCheckpointListener() {
final TestGlobalCheckpointListener listener = new TestGlobalCheckpointListener() {
@Override
public void accept(final long g, final Exception e) {
notifiedListeners.put(this, g);
}
};
final long waitingGlobalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE);
listeners.put(listener, waitingGlobalCheckpoint);
@ -133,7 +144,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
public void testListenersReadyToBeNotified() throws IOException {
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
new GlobalCheckpointListeners(shardId, scheduler, logger);
final long globalCheckpoint = randomLongBetween(0, Long.MAX_VALUE);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
final int numberOfListeners = randomIntBetween(0, 16);
@ -165,7 +176,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
public void testFailingListenerReadyToBeNotified() {
final Logger mockLogger = mock(Logger.class);
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
new GlobalCheckpointListeners(shardId, scheduler, mockLogger);
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
final int numberOfListeners = randomIntBetween(0, 16);
@ -207,7 +218,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
public void testClose() throws IOException {
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
new GlobalCheckpointListeners(shardId, scheduler, logger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
final int numberOfListeners = randomIntBetween(0, 16);
final Exception[] exceptions = new Exception[numberOfListeners];
@ -235,7 +246,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
public void testAddAfterClose() throws InterruptedException, IOException {
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
new GlobalCheckpointListeners(shardId, scheduler, logger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
globalCheckpointListeners.close();
final AtomicBoolean invoked = new AtomicBoolean();
@ -254,7 +265,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
public void testFailingListenerOnUpdate() {
final Logger mockLogger = mock(Logger.class);
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
new GlobalCheckpointListeners(shardId, scheduler, mockLogger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
final int numberOfListeners = randomIntBetween(0, 16);
final boolean[] failures = new boolean[numberOfListeners];
@ -308,7 +319,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
public void testFailingListenerOnClose() throws IOException {
final Logger mockLogger = mock(Logger.class);
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
new GlobalCheckpointListeners(shardId, scheduler, mockLogger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
final int numberOfListeners = randomIntBetween(0, 16);
final boolean[] failures = new boolean[numberOfListeners];
@ -360,24 +371,35 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
count.incrementAndGet();
command.run();
};
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE);
final AtomicInteger notified = new AtomicInteger();
final int numberOfListeners = randomIntBetween(0, 16);
for (int i = 0; i < numberOfListeners; i++) {
globalCheckpointListeners.add(
0,
maybeMultipleInvocationProtectingListener((g, e) -> {
notified.incrementAndGet();
assertThat(g, equalTo(globalCheckpoint));
assertNull(e);
0,
maybeMultipleInvocationProtectingListener(
new TestGlobalCheckpointListener() {
@Override
public Executor executor() {
return executor;
}
@Override
public void accept(final long g, final Exception e) {
notified.incrementAndGet();
assertThat(g, equalTo(globalCheckpoint));
assertNull(e);
}
}),
null);
null);
}
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
assertThat(notified.get(), equalTo(numberOfListeners));
assertThat(count.get(), equalTo(numberOfListeners == 0 ? 0 : 1));
assertThat(count.get(), equalTo(numberOfListeners));
}
public void testNotificationOnClosedUsesExecutor() throws IOException {
@ -386,21 +408,32 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
count.incrementAndGet();
command.run();
};
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger);
globalCheckpointListeners.close();
final AtomicInteger notified = new AtomicInteger();
final int numberOfListeners = randomIntBetween(0, 16);
for (int i = 0; i < numberOfListeners; i++) {
globalCheckpointListeners.add(
NO_OPS_PERFORMED,
maybeMultipleInvocationProtectingListener((g, e) -> {
notified.incrementAndGet();
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertNotNull(e);
assertThat(e, instanceOf(IndexShardClosedException.class));
assertThat(((IndexShardClosedException) e).getShardId(), equalTo(shardId));
NO_OPS_PERFORMED,
maybeMultipleInvocationProtectingListener(
new TestGlobalCheckpointListener() {
@Override
public Executor executor() {
return executor;
}
@Override
public void accept(final long g, final Exception e) {
notified.incrementAndGet();
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertNotNull(e);
assertThat(e, instanceOf(IndexShardClosedException.class));
assertThat(((IndexShardClosedException) e).getShardId(), equalTo(shardId));
}
}),
null);
null);
}
assertThat(notified.get(), equalTo(numberOfListeners));
assertThat(count.get(), equalTo(numberOfListeners));
@ -412,20 +445,30 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
count.incrementAndGet();
command.run();
};
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger);
final long globalCheckpoint = randomNonNegativeLong();
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
final AtomicInteger notified = new AtomicInteger();
final int numberOfListeners = randomIntBetween(0, 16);
for (int i = 0; i < numberOfListeners; i++) {
globalCheckpointListeners.add(
randomLongBetween(0, globalCheckpoint),
maybeMultipleInvocationProtectingListener((g, e) -> {
notified.incrementAndGet();
assertThat(g, equalTo(globalCheckpoint));
assertNull(e);
randomLongBetween(0, globalCheckpoint),
maybeMultipleInvocationProtectingListener(
new TestGlobalCheckpointListener() {
@Override
public Executor executor() {
return executor;
}
@Override
public void accept(final long g, final Exception e) {
notified.incrementAndGet();
assertThat(g, equalTo(globalCheckpoint));
assertNull(e);
}
}),
null);
null);
}
assertThat(notified.get(), equalTo(numberOfListeners));
assertThat(count.get(), equalTo(numberOfListeners));
@ -433,7 +476,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
public void testConcurrency() throws Exception {
final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, 8));
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger);
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get());
// we are going to synchronize the actions of three threads: the updating thread, the listener thread, and the main test thread
@ -469,14 +512,24 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
invocations.add(invocation);
// sometimes this will notify the listener immediately
globalCheckpointListeners.add(
globalCheckpoint.get(),
maybeMultipleInvocationProtectingListener(
(g, e) -> {
if (invocation.compareAndSet(false, true) == false) {
throw new IllegalStateException("listener invoked twice");
}
}),
randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1))));
globalCheckpoint.get(),
maybeMultipleInvocationProtectingListener(
new TestGlobalCheckpointListener() {
@Override
public Executor executor() {
return executor;
}
@Override
public void accept(final long g, final Exception e) {
if (invocation.compareAndSet(false, true) == false) {
throw new IllegalStateException("listener invoked twice");
}
}
}),
randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1))));
}
// synchronize ending with the updating thread and the main test thread
awaitQuietly(barrier);
@ -506,7 +559,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
public void testTimeout() throws InterruptedException {
final Logger mockLogger = mock(Logger.class);
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
new GlobalCheckpointListeners(shardId, scheduler, mockLogger);
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
final AtomicBoolean notified = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
@ -541,22 +594,33 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
count.incrementAndGet();
command.run();
};
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger);
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
final AtomicBoolean notified = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
globalCheckpointListeners.add(
NO_OPS_PERFORMED,
maybeMultipleInvocationProtectingListener((g, e) -> {
try {
notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertThat(e, instanceOf(TimeoutException.class));
} finally {
latch.countDown();
NO_OPS_PERFORMED,
maybeMultipleInvocationProtectingListener(
new TestGlobalCheckpointListener() {
@Override
public Executor executor() {
return executor;
}
@Override
public void accept(final long g, final Exception e) {
try {
notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertThat(e, instanceOf(TimeoutException.class));
} finally {
latch.countDown();
}
}
}),
timeout);
timeout);
latch.await();
// ensure the listener notification occurred on the executor
assertTrue(notified.get());
@ -571,7 +635,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
return null;
}).when(mockLogger).warn(argThat(any(String.class)), argThat(any(RuntimeException.class)));
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
new GlobalCheckpointListeners(shardId, scheduler, mockLogger);
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
globalCheckpointListeners.add(
NO_OPS_PERFORMED,
@ -591,7 +655,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
public void testTimeoutCancelledAfterListenerNotified() {
final GlobalCheckpointListeners globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
new GlobalCheckpointListeners(shardId, scheduler, logger);
final TimeValue timeout = TimeValue.timeValueNanos(Long.MAX_VALUE);
final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener =
maybeMultipleInvocationProtectingListener((g, e) -> {
@ -606,14 +670,24 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
}
private GlobalCheckpointListeners.GlobalCheckpointListener maybeMultipleInvocationProtectingListener(
final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener) {
final TestGlobalCheckpointListener globalCheckpointListener) {
if (Assertions.ENABLED) {
final AtomicBoolean invoked = new AtomicBoolean();
return (g, e) -> {
if (invoked.compareAndSet(false, true) == false) {
throw new AssertionError("listener invoked twice");
return new GlobalCheckpointListeners.GlobalCheckpointListener() {
@Override
public Executor executor() {
return globalCheckpointListener.executor();
}
globalCheckpointListener.accept(g, e);
@Override
public void accept(final long g, final Exception e) {
if (invoked.compareAndSet(false, true) == false) {
throw new AssertionError("listener invoked twice");
}
globalCheckpointListener.accept(g, e);
}
};
} else {
return globalCheckpointListener;

View File

@ -101,9 +101,7 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Stream;
@ -116,8 +114,6 @@ import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.NONE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog;
import static org.elasticsearch.index.shard.IndexShardTestCase.recoverFromStore;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -128,7 +124,6 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
@ -688,83 +683,6 @@ public class IndexShardIT extends ESSingleNodeTestCase {
return shardRouting;
}
public void testGlobalCheckpointListeners() throws Exception {
createIndex("test", Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0).build());
ensureGreen();
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexService test = indicesService.indexService(resolveIndex("test"));
final IndexShard shard = test.getShardOrNull(0);
final int numberOfUpdates = randomIntBetween(1, 128);
for (int i = 0; i < numberOfUpdates; i++) {
final int index = i;
final AtomicLong globalCheckpoint = new AtomicLong();
shard.addGlobalCheckpointListener(
i,
(g, e) -> {
assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED));
assertNull(e);
globalCheckpoint.set(g);
},
null);
client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(globalCheckpoint.get(), equalTo((long) index)));
// adding a listener expecting a lower global checkpoint should fire immediately
final AtomicLong immediateGlobalCheckpint = new AtomicLong();
shard.addGlobalCheckpointListener(
randomLongBetween(0, i),
(g, e) -> {
assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED));
assertNull(e);
immediateGlobalCheckpint.set(g);
},
null);
assertBusy(() -> assertThat(immediateGlobalCheckpint.get(), equalTo((long) index)));
}
final AtomicBoolean invoked = new AtomicBoolean();
shard.addGlobalCheckpointListener(
numberOfUpdates,
(g, e) -> {
invoked.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertThat(e, instanceOf(IndexShardClosedException.class));
assertThat(((IndexShardClosedException)e).getShardId(), equalTo(shard.shardId()));
},
null);
shard.close("closed", randomBoolean());
assertBusy(() -> assertTrue(invoked.get()));
}
public void testGlobalCheckpointListenerTimeout() throws InterruptedException {
createIndex("test", Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0).build());
ensureGreen();
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexService test = indicesService.indexService(resolveIndex("test"));
final IndexShard shard = test.getShardOrNull(0);
final AtomicBoolean notified = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
shard.addGlobalCheckpointListener(
0,
(g, e) -> {
try {
notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertNotNull(e);
assertThat(e, instanceOf(TimeoutException.class));
assertThat(e.getMessage(), equalTo(timeout.getStringRep()));
} finally {
latch.countDown();
}
},
timeout);
latch.await();
assertTrue(notified.get());
}
public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Exception {
createIndex("test", Settings.builder()
.put("index.number_of_shards", 1)

View File

@ -8,10 +8,10 @@ package org.elasticsearch.xpack.ccr.action;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
@ -32,6 +32,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.MissingHistoryOperationsException;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.GlobalCheckpointListeners;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
@ -48,6 +49,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -388,18 +390,28 @@ public class ShardChangesAction extends ActionType<ShardChangesAction.Response>
seqNoStats.getGlobalCheckpoint(),
request.getFromSeqNo());
indexShard.addGlobalCheckpointListener(
request.getFromSeqNo(),
(g, e) -> {
request.getFromSeqNo(),
new GlobalCheckpointListeners.GlobalCheckpointListener() {
@Override
public Executor executor() {
return threadPool.executor(ThreadPool.Names.LISTENER);
}
@Override
public void accept(final long g, final Exception e) {
if (g != UNASSIGNED_SEQ_NO) {
assert request.getFromSeqNo() <= g
: shardId + " only advanced to [" + g + "] while waiting for [" + request.getFromSeqNo() + "]";
: shardId + " only advanced to [" + g + "] while waiting for [" + request.getFromSeqNo() + "]";
globalCheckpointAdvanced(shardId, g, request, listener);
} else {
assert e != null;
globalCheckpointAdvancementFailure(shardId, e, request, listener, indexShard);
}
},
request.getPollTimeout());
}
},
request.getPollTimeout());
} else {
super.asyncShardOperation(request, shardId, listener);
}