Do not wait for advancement of checkpoint in recovery (#39006)
With this change, we won't wait for the local checkpoint to advance to the max_seq_no before starting phase2 of peer-recovery. We also remove the sequence number range check in peer-recovery. We can safely do these thanks to Yannick's finding. The replication group to be used is currently sampled after indexing into the primary (see `ReplicationOperation` class). This means that when initiating tracking of a new replica, we have to consider the following two cases: - There are operations for which the replication group has not been sampled yet. As we initiated the new replica as tracking, we know that those operations will be replicated to the new replica and follow the typical replication group semantics (e.g. marked as stale when unavailable). - There are operations for which the replication group has already been sampled. These operations will not be sent to the new replica. However, we know that those operations are already indexed into Lucene and the translog on the primary, as the sampling is happening after that. This means that by taking a snapshot of Lucene or the translog, we will be getting those ops as well. What we cannot guarantee anymore is that all ops up to `endingSeqNo` are available in the snapshot (i.e. also see comment in `RecoverySourceHandler` saying `We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all operations in the required range will be available for replaying from the translog of the source.`). This is not needed, though, as we can no longer guarantee that max seq no == local checkpoint. Relates #39000 Closes #38949 Co-authored-by: Yannick Welsch <yannick@welsch.lu>
This commit is contained in:
parent
236db51d34
commit
48219112e3
|
@ -808,14 +808,6 @@ public abstract class Engine implements Closeable {
|
|||
*/
|
||||
public abstract long getLocalCheckpoint();
|
||||
|
||||
/**
|
||||
* Waits for all operations up to the provided sequence number to complete.
|
||||
*
|
||||
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
|
||||
* @throws InterruptedException if the thread was interrupted while blocking on the condition
|
||||
*/
|
||||
public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException;
|
||||
|
||||
/**
|
||||
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
|
||||
*/
|
||||
|
|
|
@ -2424,11 +2424,6 @@ public class InternalEngine extends Engine {
|
|||
return localCheckpointTracker.getCheckpoint();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void waitForOpsToComplete(long seqNo) throws InterruptedException {
|
||||
localCheckpointTracker.waitForOpsToComplete(seqNo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value.
|
||||
*/
|
||||
|
|
|
@ -321,10 +321,6 @@ public class ReadOnlyEngine extends Engine {
|
|||
return seqNoStats.getLocalCheckpoint();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void waitForOpsToComplete(long seqNo) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
|
||||
return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint);
|
||||
|
|
|
@ -2042,16 +2042,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for all operations up to the provided sequence number to complete.
|
||||
*
|
||||
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
|
||||
* @throws InterruptedException if the thread was interrupted while blocking on the condition
|
||||
*/
|
||||
public void waitForOpsToComplete(final long seqNo) throws InterruptedException {
|
||||
getEngine().waitForOpsToComplete(seqNo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the recovery process for a shard has opened the engine on the target shard. Ensures that the right data structures
|
||||
* have been set up locally to track local checkpoint information for the shard and that the shard is added to the replication group.
|
||||
|
|
|
@ -158,14 +158,12 @@ public class RecoverySourceHandler {
|
|||
final Closeable retentionLock = shard.acquireRetentionLock();
|
||||
resources.add(retentionLock);
|
||||
final long startingSeqNo;
|
||||
final long requiredSeqNoRangeStart;
|
||||
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
|
||||
isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo());
|
||||
final SendFileResult sendFileResult;
|
||||
if (isSequenceNumberBasedRecovery) {
|
||||
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
|
||||
startingSeqNo = request.startingSeqNo();
|
||||
requiredSeqNoRangeStart = startingSeqNo;
|
||||
sendFileResult = SendFileResult.EMPTY;
|
||||
} else {
|
||||
final Engine.IndexCommitRef phase1Snapshot;
|
||||
|
@ -174,9 +172,6 @@ public class RecoverySourceHandler {
|
|||
} catch (final Exception e) {
|
||||
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
|
||||
}
|
||||
// We must have everything above the local checkpoint in the commit
|
||||
requiredSeqNoRangeStart =
|
||||
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
|
||||
// We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will
|
||||
// still filter out legacy operations without seqNo.
|
||||
startingSeqNo = 0;
|
||||
|
@ -194,8 +189,6 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
}
|
||||
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;
|
||||
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
|
||||
+ startingSeqNo + "]";
|
||||
|
||||
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
|
||||
// For a sequence based recovery, the target can keep its local translog
|
||||
|
@ -213,13 +206,7 @@ public class RecoverySourceHandler {
|
|||
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);
|
||||
|
||||
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
|
||||
/*
|
||||
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
|
||||
* operations in the required range will be available for replaying from the translog of the source.
|
||||
*/
|
||||
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
|
||||
logger.trace("snapshot translog for recovery; current size is [{}]",
|
||||
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
|
||||
}
|
||||
|
@ -232,15 +219,8 @@ public class RecoverySourceHandler {
|
|||
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
|
||||
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
|
||||
final RetentionLeases retentionLeases = shard.getRetentionLeases();
|
||||
phase2(
|
||||
startingSeqNo,
|
||||
requiredSeqNoRangeStart,
|
||||
endingSeqNo,
|
||||
phase2Snapshot,
|
||||
maxSeenAutoIdTimestamp,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
retentionLeases,
|
||||
sendSnapshotStep);
|
||||
phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
|
||||
retentionLeases, sendSnapshotStep);
|
||||
sendSnapshotStep.whenComplete(
|
||||
r -> IOUtils.close(phase2Snapshot),
|
||||
e -> {
|
||||
|
@ -518,7 +498,6 @@ public class RecoverySourceHandler {
|
|||
*
|
||||
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
|
||||
* ops should be sent
|
||||
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
|
||||
* @param endingSeqNo the highest sequence number that should be sent
|
||||
* @param snapshot a snapshot of the translog
|
||||
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
|
||||
|
@ -527,26 +506,19 @@ public class RecoverySourceHandler {
|
|||
*/
|
||||
void phase2(
|
||||
final long startingSeqNo,
|
||||
final long requiredSeqNoRangeStart,
|
||||
final long endingSeqNo,
|
||||
final Translog.Snapshot snapshot,
|
||||
final long maxSeenAutoIdTimestamp,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final RetentionLeases retentionLeases,
|
||||
final ActionListener<SendSnapshotResult> listener) throws IOException {
|
||||
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
|
||||
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
|
||||
assert startingSeqNo <= requiredSeqNoRangeStart :
|
||||
"startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart;
|
||||
if (shard.state() == IndexShardState.CLOSED) {
|
||||
throw new IndexShardClosedException(request.shardId());
|
||||
}
|
||||
logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " +
|
||||
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");
|
||||
logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");
|
||||
|
||||
final AtomicInteger skippedOps = new AtomicInteger();
|
||||
final AtomicInteger totalSentOps = new AtomicInteger();
|
||||
final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1);
|
||||
final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch.
|
||||
final CheckedSupplier<List<Translog.Operation>, IOException> readNextBatch = () -> {
|
||||
// We need to synchronized Snapshot#next() because it's called by different threads through sendBatch.
|
||||
|
@ -568,7 +540,6 @@ public class RecoverySourceHandler {
|
|||
ops.add(operation);
|
||||
batchSizeInBytes += operation.estimateSize();
|
||||
totalSentOps.incrementAndGet();
|
||||
requiredOpsTracker.markSeqNoAsCompleted(seqNo);
|
||||
|
||||
// check if this request is past bytes threshold, and if so, send it off
|
||||
if (batchSizeInBytes >= chunkSizeInBytes) {
|
||||
|
@ -586,11 +557,6 @@ public class RecoverySourceHandler {
|
|||
assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()
|
||||
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
|
||||
snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get());
|
||||
if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
|
||||
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +
|
||||
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
|
||||
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
|
||||
}
|
||||
stopWatch.stop();
|
||||
final TimeValue tookTime = stopWatch.totalTime();
|
||||
logger.trace("recovery [phase2]: took [{}]", tookTime);
|
||||
|
|
|
@ -97,7 +97,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.IntSupplier;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.zip.CRC32;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
@ -231,8 +230,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true)));
|
||||
}
|
||||
final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1);
|
||||
final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1);
|
||||
final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1);
|
||||
final long endingSeqNo = randomLongBetween(startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1);
|
||||
|
||||
final List<Translog.Operation> shippedOps = new ArrayList<>();
|
||||
final AtomicLong checkpointOnTarget = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
|
@ -247,7 +245,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
};
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
|
||||
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
|
||||
handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
|
||||
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
|
||||
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
|
||||
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
|
||||
RecoverySourceHandler.SendSnapshotResult result = future.actionGet();
|
||||
|
@ -258,18 +256,6 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs)));
|
||||
}
|
||||
assertThat(result.targetLocalCheckpoint, equalTo(checkpointOnTarget.get()));
|
||||
if (endingSeqNo >= requiredStartingSeqNo + 1) {
|
||||
// check that missing ops blows up
|
||||
List<Translog.Operation> requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker
|
||||
.filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList());
|
||||
List<Translog.Operation> opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps);
|
||||
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> failedFuture = new PlainActionFuture<>();
|
||||
expectThrows(IllegalStateException.class, () -> {
|
||||
handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, opsToSkip),
|
||||
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, failedFuture);
|
||||
failedFuture.actionGet();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void testSendSnapshotStopOnError() throws Exception {
|
||||
|
@ -299,7 +285,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
|
||||
final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
|
||||
final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
|
||||
handler.phase2(startingSeqNo, startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()),
|
||||
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()),
|
||||
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
|
||||
if (wasFailed.get()) {
|
||||
assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index"));
|
||||
|
@ -498,11 +484,11 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
|
||||
void phase2(long startingSeqNo, long endingSeqNo, Translog.Snapshot snapshot,
|
||||
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases,
|
||||
ActionListener<SendSnapshotResult> listener) throws IOException {
|
||||
phase2Called.set(true);
|
||||
super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
|
||||
super.phase2(startingSeqNo, endingSeqNo, snapshot,
|
||||
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, listener);
|
||||
}
|
||||
|
||||
|
|
|
@ -1109,6 +1109,16 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
return internalEngine.getTranslog();
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for all operations up to the provided sequence number to complete in the given internal engine.
|
||||
*
|
||||
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
|
||||
* @throws InterruptedException if the thread was interrupted while blocking on the condition
|
||||
*/
|
||||
public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throws InterruptedException {
|
||||
engine.getLocalCheckpointTracker().waitForOpsToComplete(seqNo);
|
||||
}
|
||||
|
||||
public static boolean hasSnapshottedCommits(Engine engine) {
|
||||
assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass();
|
||||
InternalEngine internalEngine = (InternalEngine) engine;
|
||||
|
|
|
@ -114,7 +114,6 @@ import static org.hamcrest.Matchers.nullValue;
|
|||
|
||||
public class IndexFollowingIT extends CcrIntegTestCase {
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38949")
|
||||
public void testFollowIndex() throws Exception {
|
||||
final int numberOfPrimaryShards = randomIntBetween(1, 3);
|
||||
int numberOfReplicas = between(0, 1);
|
||||
|
@ -221,7 +220,6 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38949")
|
||||
public void testFollowIndexWithConcurrentMappingChanges() throws Exception {
|
||||
final int numberOfPrimaryShards = randomIntBetween(1, 3);
|
||||
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
|
||||
|
|
|
@ -338,7 +338,7 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
for (int i = 0; i < numDocs; i++) {
|
||||
leader.index(indexForPrimary(Integer.toString(i)));
|
||||
}
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L));
|
||||
assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs));
|
||||
assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
|
||||
|
@ -351,7 +351,7 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
leader.delete(deleteForPrimary(Integer.toString(i)));
|
||||
}
|
||||
}
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes()));
|
||||
assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs));
|
||||
assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
|
||||
|
@ -363,7 +363,7 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
docIds.add(docId);
|
||||
leader.index(indexForPrimary(docId));
|
||||
}
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes()));
|
||||
assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs + moreDocs));
|
||||
assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
|
||||
|
@ -379,7 +379,7 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
runFollowTest((leader, follower) -> {
|
||||
EngineTestCase.concurrentlyApplyOps(ops, leader);
|
||||
assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L));
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
assertThat(follower.getNumberOfOptimizedIndexing(), equalTo((long) numOps));
|
||||
});
|
||||
}
|
||||
|
@ -397,13 +397,13 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
Randomness.shuffle(ops);
|
||||
runFollowTest((leader, follower) -> {
|
||||
EngineTestCase.concurrentlyApplyOps(ops, leader);
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
final List<Engine.Operation> appendOps = new ArrayList<>();
|
||||
for (int numAppends = scaledRandomIntBetween(0, 100), i = 0; i < numAppends; i++) {
|
||||
appendOps.add(indexForPrimary("append-" + i));
|
||||
}
|
||||
EngineTestCase.concurrentlyApplyOps(appendOps, leader);
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
assertThat(follower.getNumberOfOptimizedIndexing(), greaterThanOrEqualTo((long) appendOps.size()));
|
||||
});
|
||||
}
|
||||
|
@ -411,19 +411,19 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
public void testOptimizeSingleDocSequentially() throws Exception {
|
||||
runFollowTest((leader, follower) -> {
|
||||
leader.index(indexForPrimary("id"));
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L));
|
||||
|
||||
leader.delete(deleteForPrimary("id"));
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L));
|
||||
|
||||
leader.index(indexForPrimary("id"));
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L));
|
||||
|
||||
leader.index(indexForPrimary("id"));
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L));
|
||||
});
|
||||
}
|
||||
|
@ -433,20 +433,20 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
Randomness.shuffle(ops);
|
||||
runFollowTest((leader, follower) -> {
|
||||
EngineTestCase.concurrentlyApplyOps(ops, leader);
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
|
||||
long numOptimized = follower.getNumberOfOptimizedIndexing();
|
||||
|
||||
leader.delete(deleteForPrimary("id"));
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized));
|
||||
|
||||
leader.index(indexForPrimary("id"));
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L));
|
||||
|
||||
leader.index(indexForPrimary("id"));
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L));
|
||||
});
|
||||
}
|
||||
|
@ -473,7 +473,7 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
latch.countDown();
|
||||
latch.await();
|
||||
task.accept(leader, follower);
|
||||
follower.waitForOpsToComplete(leader.getLocalCheckpoint());
|
||||
EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint());
|
||||
} finally {
|
||||
taskIsCompleted.set(true);
|
||||
for (Thread thread : threads) {
|
||||
|
|
Loading…
Reference in New Issue