diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java new file mode 100644 index 00000000000..e279badec4a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -0,0 +1,166 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Executor; + +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + +/** + * Represents a collection of global checkpoint listeners. This collection can be added to, and all listeners present at the time of an + * update will be notified together. All listeners will be notified when the shard is closed. + */ +public class GlobalCheckpointListeners implements Closeable { + + /** + * A global checkpoint listener consisting of a callback that is notified when the global checkpoint is updated or the shard is closed. + */ + @FunctionalInterface + public interface GlobalCheckpointListener { + /** + * Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint + * will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null. If the + * global checkpoint is updated, the exception will be null. + * + * @param globalCheckpoint the updated global checkpoint + * @param e if non-null, the shard is closed + */ + void accept(long globalCheckpoint, IndexShardClosedException e); + } + + // guarded by this + private boolean closed; + private volatile List listeners; + private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO; + + private final ShardId shardId; + private final Executor executor; + private final Logger logger; + + /** + * Construct a global checkpoint listeners collection. + * + * @param shardId the shard ID on which global checkpoint updates can be listened to + * @param executor the executor for listener notifications + * @param logger a shard-level logger + */ + GlobalCheckpointListeners( + final ShardId shardId, + final Executor executor, + final Logger logger) { + this.shardId = Objects.requireNonNull(shardId); + this.executor = Objects.requireNonNull(executor); + this.logger = Objects.requireNonNull(logger); + } + + /** + * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the + * listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the + * shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global + * checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard + * is closed. A listener must re-register after one of these events to receive subsequent events. + * + * @param currentGlobalCheckpoint the current global checkpoint known to the listener + * @param listener the listener + */ + synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) { + if (closed) { + executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId))); + return; + } + if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) { + // notify directly + executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null)); + return; + } else { + if (listeners == null) { + listeners = new ArrayList<>(); + } + listeners.add(listener); + } + } + + @Override + public synchronized void close() throws IOException { + closed = true; + notifyListeners(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)); + } + + synchronized int pendingListeners() { + return listeners == null ? 0 : listeners.size(); + } + + /** + * Invoke to notify all registered listeners of an updated global checkpoint. + * + * @param globalCheckpoint the updated global checkpoint + */ + synchronized void globalCheckpointUpdated(final long globalCheckpoint) { + assert globalCheckpoint >= NO_OPS_PERFORMED; + assert globalCheckpoint > lastKnownGlobalCheckpoint + : "updated global checkpoint [" + globalCheckpoint + "]" + + " is not more than the last known global checkpoint [" + lastKnownGlobalCheckpoint + "]"; + lastKnownGlobalCheckpoint = globalCheckpoint; + notifyListeners(globalCheckpoint, null); + } + + private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) { + assert Thread.holdsLock(this); + assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null); + if (listeners != null) { + // capture the current listeners + final List currentListeners = listeners; + listeners = null; + if (currentListeners != null) { + executor.execute(() -> { + for (final GlobalCheckpointListener listener : currentListeners) { + notifyListener(listener, globalCheckpoint, e); + } + }); + } + } + } + + private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final IndexShardClosedException e) { + try { + listener.accept(globalCheckpoint, e); + } catch (final Exception caught) { + if (globalCheckpoint != UNASSIGNED_SEQ_NO) { + logger.warn( + new ParameterizedMessage( + "error notifying global checkpoint listener of updated global checkpoint [{}]", + globalCheckpoint), + caught); + } else { + logger.warn("error notifying global checkpoint listener of closed shard", caught); + } + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 08a0111fb4d..ffce0e6ea8b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -161,6 +161,8 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.elasticsearch.index.mapper.SourceToParse.source; +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { @@ -189,6 +191,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final SearchOperationListener searchOperationListener; + private final GlobalCheckpointListeners globalCheckpointListeners; private final ReplicationTracker replicationTracker; protected volatile ShardRouting shardRouting; @@ -298,8 +301,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); final String aId = shardRouting.allocationId().getId(); + this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), logger); this.replicationTracker = - new ReplicationTracker(shardId, aId, indexSettings, SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint -> {}); + new ReplicationTracker(shardId, aId, indexSettings, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated); + // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) { @@ -664,7 +669,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse, long autoGeneratedTimestamp, boolean isRetry) throws IOException { assert versionType.validateVersionForWrites(version); - return applyIndexOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp, + return applyIndexOperation(UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse); } @@ -765,7 +770,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType) throws IOException { assert versionType.validateVersionForWrites(version); - return applyDeleteOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType, + return applyDeleteOperation(UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType, Engine.Operation.Origin.PRIMARY); } @@ -1192,7 +1197,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times // Also closing refreshListeners to prevent us from accumulating any more listeners - IOUtils.close(engine, refreshListeners); + IOUtils.close(engine, globalCheckpointListeners, refreshListeners); indexShardOperationPermits.close(); } } @@ -1729,6 +1734,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl replicationTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint); } + /** + * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the + * listener will fire immediately on the calling thread. + * + * @param currentGlobalCheckpoint the current global checkpoint known to the listener + * @param listener the listener + */ + public void addGlobalCheckpointListener( + final long currentGlobalCheckpoint, + final GlobalCheckpointListeners.GlobalCheckpointListener listener) { + this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener); + } + /** * Waits for all operations up to the provided sequence number to complete. * @@ -2273,8 +2291,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); final long currentGlobalCheckpoint = getGlobalCheckpoint(); final long localCheckpoint; - if (currentGlobalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { - localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; + if (currentGlobalCheckpoint == UNASSIGNED_SEQ_NO) { + localCheckpoint = NO_OPS_PERFORMED; } else { localCheckpoint = currentGlobalCheckpoint; } diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java new file mode 100644 index 00000000000..d9240602d85 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -0,0 +1,423 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.index.Index; +import org.elasticsearch.test.ESTestCase; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class GlobalCheckpointListenersTests extends ESTestCase { + + final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); + + public void testGlobalCheckpointUpdated() throws IOException { + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); + final int numberOfListeners = randomIntBetween(0, 16); + final long[] globalCheckpoints = new long[numberOfListeners]; + for (int i = 0; i < numberOfListeners; i++) { + final int index = i; + final AtomicBoolean invoked = new AtomicBoolean(); + final GlobalCheckpointListeners.GlobalCheckpointListener listener = + (g, e) -> { + if (invoked.compareAndSet(false, true) == false) { + throw new IllegalStateException("listener invoked twice"); + } + assert g != UNASSIGNED_SEQ_NO; + assert e == null; + globalCheckpoints[index] = g; + }; + globalCheckpointListeners.add(NO_OPS_PERFORMED, listener); + } + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); + for (int i = 0; i < numberOfListeners; i++) { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } + + // test the listeners are not invoked twice + final long nextGlobalCheckpoint = randomLongBetween(globalCheckpoint + 1, Long.MAX_VALUE); + globalCheckpointListeners.globalCheckpointUpdated(nextGlobalCheckpoint); + for (int i = 0; i < numberOfListeners; i++) { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } + + // closing should also not notify the listeners + globalCheckpointListeners.close(); + for (int i = 0; i < numberOfListeners; i++) { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } + } + + public void testListenersReadyToBeNotified() throws IOException { + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); + final int numberOfListeners = randomIntBetween(0, 16); + final long[] globalCheckpoints = new long[numberOfListeners]; + for (int i = 0; i < numberOfListeners; i++) { + final int index = i; + final AtomicBoolean invoked = new AtomicBoolean(); + final GlobalCheckpointListeners.GlobalCheckpointListener listener = + (g, e) -> { + if (invoked.compareAndSet(false, true) == false) { + throw new IllegalStateException("listener invoked twice"); + } + assert g != UNASSIGNED_SEQ_NO; + assert e == null; + globalCheckpoints[index] = g; + }; + globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener); + // the listener should be notified immediately + assertThat(globalCheckpoints[index], equalTo(globalCheckpoint)); + } + + // test the listeners are not invoked twice + final long nextGlobalCheckpoint = randomLongBetween(globalCheckpoint + 1, Long.MAX_VALUE); + globalCheckpointListeners.globalCheckpointUpdated(nextGlobalCheckpoint); + for (int i = 0; i < numberOfListeners; i++) { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } + + // closing should also not notify the listeners + globalCheckpointListeners.close(); + for (int i = 0; i < numberOfListeners; i++) { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } + } + + public void testFailingListenerReadyToBeNotified() { + final Logger mockLogger = mock(Logger.class); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger); + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); + final int numberOfListeners = randomIntBetween(0, 16); + final long[] globalCheckpoints = new long[numberOfListeners]; + for (int i = 0; i < numberOfListeners; i++) { + final int index = i; + final boolean failure = randomBoolean(); + final GlobalCheckpointListeners.GlobalCheckpointListener listener = + (g, e) -> { + assert globalCheckpoint != UNASSIGNED_SEQ_NO; + assert e == null; + if (failure) { + globalCheckpoints[index] = Long.MIN_VALUE; + throw new RuntimeException("failure"); + } else { + globalCheckpoints[index] = globalCheckpoint; + } + }; + globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener); + // the listener should be notified immediately + if (failure) { + assertThat(globalCheckpoints[i], equalTo(Long.MIN_VALUE)); + final ArgumentCaptor message = ArgumentCaptor.forClass(ParameterizedMessage.class); + final ArgumentCaptor t = ArgumentCaptor.forClass(RuntimeException.class); + verify(mockLogger).warn(message.capture(), t.capture()); + reset(mockLogger); + assertThat( + message.getValue().getFormat(), + equalTo("error notifying global checkpoint listener of updated global checkpoint [{}]")); + assertNotNull(message.getValue().getParameters()); + assertThat(message.getValue().getParameters().length, equalTo(1)); + assertThat(message.getValue().getParameters()[0], equalTo(globalCheckpoint)); + assertNotNull(t.getValue()); + assertThat(t.getValue().getMessage(), equalTo("failure")); + } else { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } + } + } + + public void testClose() throws IOException { + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); + final int numberOfListeners = randomIntBetween(0, 16); + final IndexShardClosedException[] exceptions = new IndexShardClosedException[numberOfListeners]; + for (int i = 0; i < numberOfListeners; i++) { + final int index = i; + final AtomicBoolean invoked = new AtomicBoolean(); + final GlobalCheckpointListeners.GlobalCheckpointListener listener = + (globalCheckpoint, e) -> { + if (invoked.compareAndSet(false, true) == false) { + throw new IllegalStateException("listener invoked twice"); + } + assert globalCheckpoint == UNASSIGNED_SEQ_NO; + assert e != null; + exceptions[index] = e; + }; + globalCheckpointListeners.add(NO_OPS_PERFORMED, listener); + } + globalCheckpointListeners.close(); + for (int i = 0; i < numberOfListeners; i++) { + assertNotNull(exceptions[i]); + assertThat(exceptions[i].getShardId(), equalTo(shardId)); + } + + // test the listeners are not invoked twice + for (int i = 0; i < numberOfListeners; i++) { + exceptions[i] = null; + } + globalCheckpointListeners.close(); + for (int i = 0; i < numberOfListeners; i++) { + assertNull(exceptions[i]); + } + } + + public void testAddAfterClose() throws InterruptedException, IOException { + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); + globalCheckpointListeners.close(); + final AtomicBoolean invoked = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(1); + final GlobalCheckpointListeners.GlobalCheckpointListener listener = (g, e) -> { + assert g == UNASSIGNED_SEQ_NO; + assert e != null; + if (invoked.compareAndSet(false, true) == false) { + latch.countDown(); + throw new IllegalStateException("listener invoked twice"); + } + latch.countDown(); + }; + globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE), listener); + latch.await(); + assertTrue(invoked.get()); + } + + public void testFailingListenerOnUpdate() { + final Logger mockLogger = mock(Logger.class); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger); + globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); + final int numberOfListeners = randomIntBetween(0, 16); + final boolean[] failures = new boolean[numberOfListeners]; + final long[] globalCheckpoints = new long[numberOfListeners]; + for (int i = 0; i < numberOfListeners; i++) { + final int index = i; + final boolean failure = randomBoolean(); + failures[index] = failure; + final GlobalCheckpointListeners.GlobalCheckpointListener listener = + (g, e) -> { + assert g != UNASSIGNED_SEQ_NO; + assert e == null; + if (failure) { + globalCheckpoints[index] = Long.MIN_VALUE; + throw new RuntimeException("failure"); + } else { + globalCheckpoints[index] = g; + } + }; + globalCheckpointListeners.add(NO_OPS_PERFORMED, listener); + } + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); + for (int i = 0; i < numberOfListeners; i++) { + if (failures[i]) { + assertThat(globalCheckpoints[i], equalTo(Long.MIN_VALUE)); + } else { + assertThat(globalCheckpoints[i], equalTo(globalCheckpoint)); + } + } + int failureCount = 0; + for (int i = 0; i < numberOfListeners; i++) { + if (failures[i]) { + failureCount++; + } + } + if (failureCount > 0) { + final ArgumentCaptor message = ArgumentCaptor.forClass(ParameterizedMessage.class); + final ArgumentCaptor t = ArgumentCaptor.forClass(RuntimeException.class); + verify(mockLogger, times(failureCount)).warn(message.capture(), t.capture()); + assertThat( + message.getValue().getFormat(), + equalTo("error notifying global checkpoint listener of updated global checkpoint [{}]")); + assertNotNull(message.getValue().getParameters()); + assertThat(message.getValue().getParameters().length, equalTo(1)); + assertThat(message.getValue().getParameters()[0], equalTo(globalCheckpoint)); + assertNotNull(t.getValue()); + assertThat(t.getValue().getMessage(), equalTo("failure")); + } + } + + public void testFailingListenerOnClose() throws IOException { + final Logger mockLogger = mock(Logger.class); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger); + globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); + final int numberOfListeners = randomIntBetween(0, 16); + final boolean[] failures = new boolean[numberOfListeners]; + final IndexShardClosedException[] exceptions = new IndexShardClosedException[numberOfListeners]; + for (int i = 0; i < numberOfListeners; i++) { + final int index = i; + final boolean failure = randomBoolean(); + failures[index] = failure; + final GlobalCheckpointListeners.GlobalCheckpointListener listener = + (g, e) -> { + assert g == UNASSIGNED_SEQ_NO; + assert e != null; + if (failure) { + throw new RuntimeException("failure"); + } else { + exceptions[index] = e; + } + }; + globalCheckpointListeners.add(NO_OPS_PERFORMED, listener); + } + globalCheckpointListeners.close(); + for (int i = 0; i < numberOfListeners; i++) { + if (failures[i]) { + assertNull(exceptions[i]); + } else { + assertNotNull(exceptions[i]); + assertThat(exceptions[i].getShardId(), equalTo(shardId)); + } + } + int failureCount = 0; + for (int i = 0; i < numberOfListeners; i++) { + if (failures[i]) { + failureCount++; + } + } + if (failureCount > 0) { + final ArgumentCaptor message = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor t = ArgumentCaptor.forClass(RuntimeException.class); + verify(mockLogger, times(failureCount)).warn(message.capture(), t.capture()); + assertThat(message.getValue(), equalTo("error notifying global checkpoint listener of closed shard")); + assertNotNull(t.getValue()); + assertThat(t.getValue().getMessage(), equalTo("failure")); + } + } + + public void testNotificationUsesExecutor() { + final AtomicInteger count = new AtomicInteger(); + final Executor executor = command -> { + count.incrementAndGet(); + command.run(); + }; + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); + globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); + final int numberOfListeners = randomIntBetween(0, 16); + for (int i = 0; i < numberOfListeners; i++) { + globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> {}); + } + globalCheckpointListeners.globalCheckpointUpdated(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE)); + assertThat(count.get(), equalTo(1)); + } + + public void testConcurrency() throws BrokenBarrierException, InterruptedException { + final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, 8)); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); + final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get()); + // we are going to synchronize the actions of three threads: the updating thread, the listener thread, and the main test thread + final CyclicBarrier barrier = new CyclicBarrier(3); + final int numberOfIterations = randomIntBetween(1, 1024); + final AtomicBoolean closed = new AtomicBoolean(); + final Thread updatingThread = new Thread(() -> { + // synchronize starting with the listener thread and the main test thread + awaitQuietly(barrier); + for (int i = 0; i < numberOfIterations; i++) { + if (rarely() && closed.get() == false) { + closed.set(true); + try { + globalCheckpointListeners.close(); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + } + if (closed.get() == false) { + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet()); + } + } + // synchronize ending with the listener thread and the main test thread + awaitQuietly(barrier); + }); + + final List invocations = new CopyOnWriteArrayList<>(); + final Thread listenersThread = new Thread(() -> { + // synchronize starting with the updating thread and the main test thread + awaitQuietly(barrier); + for (int i = 0; i < numberOfIterations; i++) { + final AtomicBoolean invocation = new AtomicBoolean(); + invocations.add(invocation); + // sometimes this will notify the listener immediately + globalCheckpointListeners.add( + globalCheckpoint.get(), + (g, e) -> { + if (invocation.compareAndSet(false, true) == false) { + throw new IllegalStateException("listener invoked twice"); + } + }); + } + // synchronize ending with the updating thread and the main test thread + awaitQuietly(barrier); + }); + updatingThread.start(); + listenersThread.start(); + // synchronize starting with the updating thread and the listener thread + barrier.await(); + // synchronize ending with the updating thread and the listener thread + barrier.await(); + // one last update to ensure all listeners are notified + if (closed.get() == false) { + globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet()); + } + assertThat(globalCheckpointListeners.pendingListeners(), equalTo(0)); + executor.shutdown(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + for (final AtomicBoolean invocation : invocations) { + assertTrue(invocation.get()); + } + updatingThread.join(); + listenersThread.join(); + } + + private void awaitQuietly(final CyclicBarrier barrier) { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new AssertionError(e); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index bda6de8aa7d..182747e7dda 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -91,6 +91,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -101,6 +102,8 @@ import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.NONE; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -729,4 +732,48 @@ public class IndexShardIT extends ESSingleNodeTestCase { assertTrue(shard.isSearchIdle()); assertHitCount(client().prepareSearch().get(), 3); } + + public void testGlobalCheckpointListeners() throws Exception { + createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); + ensureGreen(); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexService test = indicesService.indexService(resolveIndex("test")); + final IndexShard shard = test.getShardOrNull(0); + final int numberOfUpdates = randomIntBetween(1, 128); + for (int i = 0; i < numberOfUpdates; i++) { + final int index = i; + final AtomicLong globalCheckpoint = new AtomicLong(); + shard.addGlobalCheckpointListener( + i - 1, + (g, e) -> { + assert g >= NO_OPS_PERFORMED; + assert e == null; + globalCheckpoint.set(g); + }); + client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(globalCheckpoint.get(), equalTo((long) index))); + // adding a listener expecting a lower global checkpoint should fire immediately + final AtomicLong immediateGlobalCheckpint = new AtomicLong(); + shard.addGlobalCheckpointListener( + randomLongBetween(NO_OPS_PERFORMED, i - 1), + (g, e) -> { + assert g >= NO_OPS_PERFORMED; + assert e == null; + immediateGlobalCheckpint.set(g); + }); + assertBusy(() -> assertThat(immediateGlobalCheckpint.get(), equalTo((long) index))); + } + final AtomicBoolean invoked = new AtomicBoolean(); + shard.addGlobalCheckpointListener( + numberOfUpdates - 1, + (g, e) -> { + invoked.set(true); + assert g == UNASSIGNED_SEQ_NO; + assert e != null; + assertThat(e.getShardId(), equalTo(shard.shardId())); + }); + shard.close("closed", randomBoolean()); + assertBusy(() -> assertTrue(invoked.get())); + } + }