diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index dbe779864fe..de393881c89 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -808,14 +808,6 @@ public abstract class Engine implements Closeable { */ public abstract long getLocalCheckpoint(); - /** - * Waits for all operations up to the provided sequence number to complete. - * - * @param seqNo the sequence number that the checkpoint must advance to before this method returns - * @throws InterruptedException if the thread was interrupted while blocking on the condition - */ - public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException; - /** * @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 65dae9c4598..42d258bfed4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2424,11 +2424,6 @@ public class InternalEngine extends Engine { return localCheckpointTracker.getCheckpoint(); } - @Override - public void waitForOpsToComplete(long seqNo) throws InterruptedException { - localCheckpointTracker.waitForOpsToComplete(seqNo); - } - /** * Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value. */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 230b550539e..62650a470e2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -321,10 +321,6 @@ public class ReadOnlyEngine extends Engine { return seqNoStats.getLocalCheckpoint(); } - @Override - public void waitForOpsToComplete(long seqNo) { - } - @Override public SeqNoStats getSeqNoStats(long globalCheckpoint) { return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 27c3d5eabcf..bb92b419ae2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2042,16 +2042,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } - /** - * Waits for all operations up to the provided sequence number to complete. - * - * @param seqNo the sequence number that the checkpoint must advance to before this method returns - * @throws InterruptedException if the thread was interrupted while blocking on the condition - */ - public void waitForOpsToComplete(final long seqNo) throws InterruptedException { - getEngine().waitForOpsToComplete(seqNo); - } - /** * Called when the recovery process for a shard has opened the engine on the target shard. Ensures that the right data structures * have been set up locally to track local checkpoint information for the shard and that the shard is added to the replication group. diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index e7a8fbfb523..6bca848a361 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -158,14 +158,12 @@ public class RecoverySourceHandler { final Closeable retentionLock = shard.acquireRetentionLock(); resources.add(retentionLock); final long startingSeqNo; - final long requiredSeqNoRangeStart; final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()); final SendFileResult sendFileResult; if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); - requiredSeqNoRangeStart = startingSeqNo; sendFileResult = SendFileResult.EMPTY; } else { final Engine.IndexCommitRef phase1Snapshot; @@ -174,9 +172,6 @@ public class RecoverySourceHandler { } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } - // We must have everything above the local checkpoint in the commit - requiredSeqNoRangeStart = - Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; // We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will // still filter out legacy operations without seqNo. startingSeqNo = 0; @@ -194,8 +189,6 @@ public class RecoverySourceHandler { } } assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; - assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" - + startingSeqNo + "]"; final StepListener prepareEngineStep = new StepListener<>(); // For a sequence based recovery, the target can keep its local translog @@ -213,13 +206,7 @@ public class RecoverySourceHandler { shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - /* - * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all - * operations in the required range will be available for replaying from the translog of the source. - */ - cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); if (logger.isTraceEnabled()) { - logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); logger.trace("snapshot translog for recovery; current size is [{}]", shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); } @@ -232,15 +219,8 @@ public class RecoverySourceHandler { final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); final RetentionLeases retentionLeases = shard.getRetentionLeases(); - phase2( - startingSeqNo, - requiredSeqNoRangeStart, - endingSeqNo, - phase2Snapshot, - maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, - retentionLeases, - sendSnapshotStep); + phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, + retentionLeases, sendSnapshotStep); sendSnapshotStep.whenComplete( r -> IOUtils.close(phase2Snapshot), e -> { @@ -518,7 +498,6 @@ public class RecoverySourceHandler { * * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all * ops should be sent - * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) * @param endingSeqNo the highest sequence number that should be sent * @param snapshot a snapshot of the translog * @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary @@ -527,26 +506,19 @@ public class RecoverySourceHandler { */ void phase2( final long startingSeqNo, - final long requiredSeqNoRangeStart, final long endingSeqNo, final Translog.Snapshot snapshot, final long maxSeenAutoIdTimestamp, final long maxSeqNoOfUpdatesOrDeletes, final RetentionLeases retentionLeases, final ActionListener listener) throws IOException { - assert requiredSeqNoRangeStart <= endingSeqNo + 1: - "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; - assert startingSeqNo <= requiredSeqNoRangeStart : - "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart; if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } - logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + - "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); + logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]"); final AtomicInteger skippedOps = new AtomicInteger(); final AtomicInteger totalSentOps = new AtomicInteger(); - final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch. final CheckedSupplier, IOException> readNextBatch = () -> { // We need to synchronized Snapshot#next() because it's called by different threads through sendBatch. @@ -568,7 +540,6 @@ public class RecoverySourceHandler { ops.add(operation); batchSizeInBytes += operation.estimateSize(); totalSentOps.incrementAndGet(); - requiredOpsTracker.markSeqNoAsCompleted(seqNo); // check if this request is past bytes threshold, and if so, send it off if (batchSizeInBytes >= chunkSizeInBytes) { @@ -586,11 +557,6 @@ public class RecoverySourceHandler { assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get() : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get()); - if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { - throw new IllegalStateException("translog replay failed to cover required sequence numbers" + - " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" - + (requiredOpsTracker.getCheckpoint() + 1) + "]"); - } stopWatch.stop(); final TimeValue tookTime = stopWatch.totalTime(); logger.trace("recovery [phase2]: took [{}]", tookTime); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 8391827b2f8..ae6eae86564 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -97,7 +97,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.IntSupplier; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.zip.CRC32; import static java.util.Collections.emptyMap; @@ -231,8 +230,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true))); } final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1); - final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); - final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1); + final long endingSeqNo = randomLongBetween(startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); final List shippedOps = new ArrayList<>(); final AtomicLong checkpointOnTarget = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); @@ -247,7 +245,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { }; RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); PlainActionFuture future = new PlainActionFuture<>(); - handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), + handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future); final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); RecoverySourceHandler.SendSnapshotResult result = future.actionGet(); @@ -258,18 +256,6 @@ public class RecoverySourceHandlerTests extends ESTestCase { assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs))); } assertThat(result.targetLocalCheckpoint, equalTo(checkpointOnTarget.get())); - if (endingSeqNo >= requiredStartingSeqNo + 1) { - // check that missing ops blows up - List requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker - .filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList()); - List opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps); - PlainActionFuture failedFuture = new PlainActionFuture<>(); - expectThrows(IllegalStateException.class, () -> { - handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, opsToSkip), - randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, failedFuture); - failedFuture.actionGet(); - }); - } } public void testSendSnapshotStopOnError() throws Exception { @@ -299,7 +285,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { PlainActionFuture future = new PlainActionFuture<>(); final long startingSeqNo = randomLongBetween(0, ops.size() - 1L); final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L); - handler.phase2(startingSeqNo, startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()), + handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future); if (wasFailed.get()) { assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index")); @@ -498,11 +484,11 @@ public class RecoverySourceHandlerTests extends ESTestCase { } @Override - void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, + void phase2(long startingSeqNo, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases, ActionListener listener) throws IOException { phase2Called.set(true); - super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, + super.phase2(startingSeqNo, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, listener); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index a6765e4e44f..e7b3f397471 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1109,6 +1109,16 @@ public abstract class EngineTestCase extends ESTestCase { return internalEngine.getTranslog(); } + /** + * Waits for all operations up to the provided sequence number to complete in the given internal engine. + * + * @param seqNo the sequence number that the checkpoint must advance to before this method returns + * @throws InterruptedException if the thread was interrupted while blocking on the condition + */ + public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throws InterruptedException { + engine.getLocalCheckpointTracker().waitForOpsToComplete(seqNo); + } + public static boolean hasSnapshottedCommits(Engine engine) { assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass(); InternalEngine internalEngine = (InternalEngine) engine; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index e0f71fe4515..d9b75f416b3 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -114,7 +114,6 @@ import static org.hamcrest.Matchers.nullValue; public class IndexFollowingIT extends CcrIntegTestCase { - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38949") public void testFollowIndex() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); int numberOfReplicas = between(0, 1); @@ -221,7 +220,6 @@ public class IndexFollowingIT extends CcrIntegTestCase { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38949") public void testFollowIndexWithConcurrentMappingChanges() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 69fa23bd3fb..b8d44f76026 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -338,7 +338,7 @@ public class FollowingEngineTests extends ESTestCase { for (int i = 0; i < numDocs; i++) { leader.index(indexForPrimary(Integer.toString(i))); } - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); @@ -351,7 +351,7 @@ public class FollowingEngineTests extends ESTestCase { leader.delete(deleteForPrimary(Integer.toString(i))); } } - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); @@ -363,7 +363,7 @@ public class FollowingEngineTests extends ESTestCase { docIds.add(docId); leader.index(indexForPrimary(docId)); } - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs + moreDocs)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); @@ -379,7 +379,7 @@ public class FollowingEngineTests extends ESTestCase { runFollowTest((leader, follower) -> { EngineTestCase.concurrentlyApplyOps(ops, leader); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo((long) numOps)); }); } @@ -397,13 +397,13 @@ public class FollowingEngineTests extends ESTestCase { Randomness.shuffle(ops); runFollowTest((leader, follower) -> { EngineTestCase.concurrentlyApplyOps(ops, leader); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); final List appendOps = new ArrayList<>(); for (int numAppends = scaledRandomIntBetween(0, 100), i = 0; i < numAppends; i++) { appendOps.add(indexForPrimary("append-" + i)); } EngineTestCase.concurrentlyApplyOps(appendOps, leader); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), greaterThanOrEqualTo((long) appendOps.size())); }); } @@ -411,19 +411,19 @@ public class FollowingEngineTests extends ESTestCase { public void testOptimizeSingleDocSequentially() throws Exception { runFollowTest((leader, follower) -> { leader.index(indexForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L)); leader.delete(deleteForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L)); leader.index(indexForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L)); leader.index(indexForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L)); }); } @@ -433,20 +433,20 @@ public class FollowingEngineTests extends ESTestCase { Randomness.shuffle(ops); runFollowTest((leader, follower) -> { EngineTestCase.concurrentlyApplyOps(ops, leader); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); long numOptimized = follower.getNumberOfOptimizedIndexing(); leader.delete(deleteForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized)); leader.index(indexForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L)); leader.index(indexForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L)); }); } @@ -473,7 +473,7 @@ public class FollowingEngineTests extends ESTestCase { latch.countDown(); latch.await(); task.accept(leader, follower); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); } finally { taskIsCompleted.set(true); for (Thread thread : threads) {