[Refactor] LuceneChangesSnapshot to use accurate ops history (#2452)

Improves the LuceneChangesSnapshot to get an accurate count of recovery
operations using sort by sequence number optimization.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
This commit is contained in:
Nick Knize 2022-03-15 13:13:54 -05:00 committed by GitHub
parent b69dc335ad
commit 757abdb9a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 174 additions and 59 deletions

View File

@ -1,5 +1,9 @@
---
"Return empty object if field doesn't exist, but index does":
- skip:
version: "all"
reason: "AwaitsFix https://github.com/opensearch-project/OpenSearch/issues/2440"
- do:
indices.create:
index: test_index

View File

@ -764,7 +764,7 @@ public class IndexShardIT extends OpenSearchSingleNodeTestCase {
}
IndexShard shard = indexService.getShard(0);
try (
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true);
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true, randomBoolean());
Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()
) {
List<Translog.Operation> opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true);

View File

@ -735,8 +735,22 @@ public abstract class Engine implements Closeable {
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
* This feature requires soft-deletes enabled. If soft-deletes are disabled, this method will throw an {@link IllegalStateException}.
*/
public abstract Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange)
throws IOException;
public abstract Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException;
/**
* Counts the number of history operations in the given sequence number range
* @param source source of the request
* @param fromSeqNo from sequence number; included
* @param toSeqNumber to sequence number; included
* @return number of history operations
*/
public abstract int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNumber) throws IOException;
public abstract boolean hasCompleteOperationHistory(String reason, long startingSeqNo);

View File

@ -2772,7 +2772,13 @@ public class InternalEngine extends Engine {
}
@Override
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
ensureOpen();
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
@ -2782,7 +2788,8 @@ public class InternalEngine extends Engine {
LuceneChangesSnapshot.DEFAULT_BATCH_SIZE,
fromSeqNo,
toSeqNo,
requiredFullRange
requiredFullRange,
accurateCount
);
searcher = null;
return snapshot;
@ -2798,6 +2805,21 @@ public class InternalEngine extends Engine {
}
}
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
ensureOpen();
refreshIfNeeded(source, toSeqNo);
try (Searcher s = acquireSearcher(source, SearcherScope.INTERNAL)) {
return LuceneChangesSnapshot.countNumberOfHistoryOperations(s, fromSeqNo, toSeqNo);
} catch (IOException e) {
try {
maybeFailEngine(source, e);
} catch (Exception innerException) {
e.addSuppressed(innerException);
}
throw e;
}
}
public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
return getMinRetainedSeqNo() <= startingSeqNo;
}

View File

@ -38,16 +38,19 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.util.ArrayUtil;
import org.opensearch.Version;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.fieldvisitor.FieldsVisitor;
import org.opensearch.index.mapper.SeqNoFieldMapper;
@ -88,8 +91,14 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
* @param toSeqNo the maximum requesting seq# - inclusive
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
*/
LuceneChangesSnapshot(Engine.Searcher engineSearcher, int searchBatchSize, long fromSeqNo, long toSeqNo, boolean requiredFullRange)
throws IOException {
LuceneChangesSnapshot(
Engine.Searcher engineSearcher,
int searchBatchSize,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
}
@ -111,7 +120,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
this.indexSearcher.setQueryCache(null);
this.parallelArray = new ParallelArray(this.searchBatchSize);
final TopDocs topDocs = searchOperations(null);
final TopDocs topDocs = searchOperations(null, accurateCount);
this.totalHits = Math.toIntExact(topDocs.totalHits.value);
this.scoreDocs = topDocs.scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
@ -187,7 +196,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
// we have processed all docs in the current search - fetch the next batch
if (docIndex == scoreDocs.length && docIndex > 0) {
final ScoreDoc prev = scoreDocs[scoreDocs.length - 1];
scoreDocs = searchOperations(prev).scoreDocs;
scoreDocs = searchOperations((FieldDoc) prev, false).scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
docIndex = 0;
}
@ -236,16 +245,31 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
}
}
private TopDocs searchOperations(ScoreDoc after) throws IOException {
final Query rangeQuery = new BooleanQuery.Builder().add(
LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo),
BooleanClause.Occur.MUST
)
// exclude non-root nested documents
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.MUST)
private static Query operationsRangeQuery(long fromSeqNo, long toSeqNo) {
return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST)
.add(Queries.newNonNestedFilter(Version.CURRENT), BooleanClause.Occur.MUST) // exclude non-root nested docs
.build();
}
static int countNumberOfHistoryOperations(Engine.Searcher searcher, long fromSeqNo, long toSeqNo) throws IOException {
if (fromSeqNo > toSeqNo || fromSeqNo < 0 || toSeqNo < 0) {
throw new IllegalArgumentException("Invalid sequence range; fromSeqNo [" + fromSeqNo + "] toSeqNo [" + toSeqNo + "]");
}
IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
return indexSearcher.count(operationsRangeQuery(fromSeqNo, toSeqNo));
}
private TopDocs searchOperations(FieldDoc after, boolean accurate) throws IOException {
final Query rangeQuery = operationsRangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo);
final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG));
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo);
final TopFieldCollector topFieldCollector = TopFieldCollector.create(
sortedBySeqNo,
searchBatchSize,
after,
accurate ? Integer.MAX_VALUE : 0
);
indexSearcher.search(rangeQuery, topFieldCollector);
return topFieldCollector.topDocs();
}
private Translog.Operation readDocAsOp(int docIndex) throws IOException {

View File

@ -325,10 +325,23 @@ public class ReadOnlyEngine extends Engine {
}
@Override
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) {
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) {
return newEmptySnapshot();
}
@Override
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
try (Translog.Snapshot snapshot = newChangesSnapshot(source, fromSeqNo, toSeqNo, false, true)) {
return snapshot.totalOperations();
}
}
public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
// we can do operation-based recovery if we don't have to replay any operation.
return startingSeqNo > seqNoStats.getMaxSeqNo();

View File

@ -2231,13 +2231,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
*
* Creates a new history snapshot for reading operations since
* the provided starting seqno (inclusive) and ending seqno (inclusive)
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo) throws IOException {
return getEngine().newChangesSnapshot(reason, startingSeqNo, endSeqNo, true);
public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo, boolean accurateCount)
throws IOException {
return getEngine().newChangesSnapshot(reason, startingSeqNo, endSeqNo, true, accurateCount);
}
/**
@ -2257,6 +2257,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return getEngine().getMinRetainedSeqNo();
}
/**
* Counts the number of history operations within the provided sequence numbers
* @param source source of the requester (e.g., peer-recovery)
* @param fromSeqNo from sequence number, included
* @param toSeqNo to sequence number, included
* @return number of history operations in the sequence number range
*/
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
return getEngine().countNumberOfHistoryOperations(source, fromSeqNo, toSeqNo);
}
/**
* Creates a new changes snapshot for reading operations whose seq_no are between {@code fromSeqNo}(inclusive)
* and {@code toSeqNo}(inclusive). The caller has to close the returned snapshot after finishing the reading.
@ -2268,8 +2279,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* if any operation between {@code fromSeqNo} and {@code toSeqNo} is missing.
* This parameter should be only enabled when the entire requesting range is below the global checkpoint.
*/
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange);
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, accurateCount);
}
public List<Segment> segments(boolean verbose) {

View File

@ -104,7 +104,7 @@ public class PrimaryReplicaSyncer {
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
// Also fail the resync early if the shard is shutting down
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false);
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false, true);
final Translog.Snapshot originalSnapshot = snapshot;
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
@Override

View File

@ -132,7 +132,7 @@ public class RecoverySourceHandler {
private final CancellableThreads cancellableThreads = new CancellableThreads();
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final ListenableFuture<RecoveryResponse> future = new ListenableFuture<>();
private static final String PEER_RECOVERY_NAME = "peer-recovery";
public static final String PEER_RECOVERY_NAME = "peer-recovery";
public RecoverySourceHandler(
IndexShard shard,
@ -272,7 +272,7 @@ public class RecoverySourceHandler {
logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);
try {
final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo);
final int estimateNumOps = countNumberOfHistoryOperations(startingSeqNo);
final Releasable releaseStore = acquireStore(shard.store());
resources.add(releaseStore);
sendFileStep.whenComplete(r -> IOUtils.close(wrappedSafeCommit, releaseStore), e -> {
@ -319,7 +319,7 @@ public class RecoverySourceHandler {
sendFileStep.whenComplete(r -> {
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);
prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);
}, onFailure);
prepareEngineStep.whenComplete(prepareEngineTime -> {
@ -340,9 +340,15 @@ public class RecoverySourceHandler {
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
if (logger.isTraceEnabled()) {
logger.trace("snapshot translog for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo));
logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo));
}
final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false);
final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(
PEER_RECOVERY_NAME,
startingSeqNo,
Long.MAX_VALUE,
false,
true
);
resources.add(phase2Snapshot);
retentionLock.close();
@ -403,10 +409,13 @@ public class RecoverySourceHandler {
return targetHistoryUUID.equals(shard.getHistoryUUID());
}
private int estimateNumberOfHistoryOperations(long startingSeqNo) throws IOException {
try (Translog.Snapshot snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false)) {
return snapshot.totalOperations();
}
/**
* Counts the number of history operations from the starting sequence number
* @param startingSeqNo the starting sequence number to count; included
* @return number of history operations
*/
private int countNumberOfHistoryOperations(long startingSeqNo) throws IOException {
return shard.countNumberOfHistoryOperations(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE);
}
static void runUnderPrimaryPermit(

View File

@ -344,11 +344,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private boolean hasUncommittedOperations() throws IOException {
long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
try (
Translog.Snapshot snapshot = indexShard.newChangesSnapshot("peer-recovery", localCheckpointOfCommit + 1, Long.MAX_VALUE, false)
) {
return snapshot.totalOperations() > 0;
}
return indexShard.countNumberOfHistoryOperations(
RecoverySourceHandler.PEER_RECOVERY_NAME,
localCheckpointOfCommit + 1,
Long.MAX_VALUE
) > 0;
}
@Override

View File

@ -6362,8 +6362,12 @@ public class InternalEngineTests extends EngineTestCase {
@Override
protected void doRun() throws Exception {
latch.await();
Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true);
changes.close();
if (randomBoolean()) {
Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true, randomBoolean());
changes.close();
} else {
engine.countNumberOfHistoryOperations("test", min, max);
}
}
});
snapshotThreads[i].start();

View File

@ -74,14 +74,14 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
long fromSeqNo = randomNonNegativeLong();
long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE);
// Empty engine
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean())) {
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(
error.getMessage(),
containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found")
);
}
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, false)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, false, randomBoolean())) {
assertThat(snapshot, SnapshotMatchers.size(0));
}
int numOps = between(1, 100);
@ -114,7 +114,8 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE),
fromSeqNo,
toSeqNo,
false
false,
randomBoolean()
)
) {
searcher = null;
@ -130,7 +131,8 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE),
fromSeqNo,
toSeqNo,
true
true,
randomBoolean()
)
) {
searcher = null;
@ -152,7 +154,8 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE),
fromSeqNo,
toSeqNo,
false
false,
randomBoolean()
)
) {
searcher = null;
@ -167,7 +170,8 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE),
fromSeqNo,
toSeqNo,
true
true,
randomBoolean()
)
) {
searcher = null;
@ -187,7 +191,8 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE),
fromSeqNo,
toSeqNo,
true
true,
randomBoolean()
)
) {
searcher = null;
@ -199,7 +204,7 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
// Get snapshot via engine will auto refresh
fromSeqNo = randomLongBetween(0, numOps - 1);
toSeqNo = randomLongBetween(fromSeqNo, numOps - 1);
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, randomBoolean())) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, randomBoolean(), randomBoolean())) {
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo));
}
}
@ -230,8 +235,11 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo();
engine.refresh("test");
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, between(1, 100), 0, maxSeqNo, false)) {
assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size()));
final boolean accurateCount = randomBoolean();
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, between(1, 100), 0, maxSeqNo, false, accurateCount)) {
if (accurateCount == true) {
assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size()));
}
Translog.Operation op;
while ((op = snapshot.next()) != null) {
assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo())));
@ -306,7 +314,7 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
long fromSeqNo = followerCheckpoint + 1;
long batchSize = randomLongBetween(0, 100);
long toSeqNo = Math.min(fromSeqNo + batchSize, leaderCheckpoint);
try (Translog.Snapshot snapshot = leader.newChangesSnapshot("test", fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = leader.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean())) {
translogHandler.run(follower, snapshot);
}
}
@ -352,7 +360,7 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
public void testOverFlow() throws Exception {
long fromSeqNo = randomLongBetween(0, 5);
long toSeqNo = randomLongBetween(Long.MAX_VALUE - 5, Long.MAX_VALUE);
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean())) {
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(
error.getMessage(),

View File

@ -499,7 +499,7 @@ public class IndexLevelReplicationTests extends OpenSearchIndexLevelReplicationT
assertThat(snapshot.totalOperations(), equalTo(0));
}
}
try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean())) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
}
}
@ -517,7 +517,7 @@ public class IndexLevelReplicationTests extends OpenSearchIndexLevelReplicationT
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(Collections.singletonList(noop2)));
}
}
try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean())) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
}
}
@ -619,7 +619,7 @@ public class IndexLevelReplicationTests extends OpenSearchIndexLevelReplicationT
shards.promoteReplicaToPrimary(replica2).get();
logger.info("--> Recover replica3 from replica2");
recoverReplica(replica3, replica2, true);
try (Translog.Snapshot snapshot = replica3.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
try (Translog.Snapshot snapshot = replica3.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, true)) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations);
expectedOps.add(op2);

View File

@ -225,7 +225,7 @@ public class RecoveryTests extends OpenSearchIndexLevelReplicationTestCase {
IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId());
shards.recoverReplica(newReplica);
shards.assertAllEqual(3);
try (Translog.Snapshot snapshot = newReplica.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
try (Translog.Snapshot snapshot = newReplica.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean())) {
assertThat(snapshot, SnapshotMatchers.size(6));
}
}

View File

@ -1312,7 +1312,7 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
*/
public static List<Translog.Operation> readAllOperationsInLucene(Engine engine) throws IOException {
final List<Translog.Operation> operations = new ArrayList<>();
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean())) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
operations.add(op);
@ -1326,7 +1326,7 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
*/
public static List<Translog.Operation> readAllOperationsBasedOnSource(Engine engine) throws IOException {
final List<Translog.Operation> operations = new ArrayList<>();
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean())) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
operations.add(op);