mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-24 05:44:59 +00:00
Use Lucene history in primary-replica resync (#33178)
This commit makes primary-replica resyncer use Lucene as the source of history operation instead of translog if soft-deletes is enabled. With this change, we no longer expose translog snapshot directly in IndexShard. Relates #29530
This commit is contained in:
parent
5954354e62
commit
e2b931e80b
@ -592,12 +592,6 @@ public abstract class Engine implements Closeable {
|
||||
*/
|
||||
public abstract Closeable acquireRetentionLockForPeerRecovery();
|
||||
|
||||
/**
|
||||
* Creates a new translog snapshot from this engine for reading translog operations whose seq# in the provided range.
|
||||
* The caller has to close the returned snapshot after finishing the reading.
|
||||
*/
|
||||
public abstract Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException;
|
||||
|
||||
public abstract TranslogStats getTranslogStats();
|
||||
|
||||
/**
|
||||
|
@ -480,11 +480,6 @@ public class InternalEngine extends Engine {
|
||||
revisitIndexDeletionPolicyOnTranslogSynced();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
|
||||
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new history snapshot for reading operations since the provided seqno.
|
||||
* The returned snapshot can be retrieved from either Lucene index or translog files.
|
||||
|
@ -1635,15 +1635,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||
return getEngine().acquireRetentionLockForPeerRecovery();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new translog snapshot for reading translog operations whose seq# at least the provided seq#.
|
||||
* The caller has to close the returned snapshot after finishing the reading.
|
||||
*/
|
||||
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
|
||||
// TODO: Remove this method after primary-replica resync use soft-deletes
|
||||
return getEngine().newSnapshotFromMinSeqNo(minSeqNo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the estimated number of history operations whose seq# at least the provided seq# in this shard.
|
||||
*/
|
||||
|
@ -89,8 +89,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
|
||||
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
|
||||
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
|
||||
// Also fail the resync early if the shard is shutting down
|
||||
// TODO: A follow-up to make resync using soft-deletes
|
||||
snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo);
|
||||
snapshot = indexShard.getHistoryOperations("resync", startingSeqNo);
|
||||
final Translog.Snapshot originalSnapshot = snapshot;
|
||||
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
|
||||
@Override
|
||||
|
@ -106,17 +106,22 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
|
||||
.isPresent(),
|
||||
is(false));
|
||||
}
|
||||
|
||||
assertEquals(globalCheckPoint == numDocs - 1 ? 0 : numDocs, resyncTask.getTotalOperations());
|
||||
if (syncNeeded && globalCheckPoint < numDocs - 1) {
|
||||
long skippedOps = globalCheckPoint + 1; // everything up to global checkpoint included
|
||||
assertEquals(skippedOps, resyncTask.getSkippedOperations());
|
||||
assertEquals(numDocs - skippedOps, resyncTask.getResyncedOperations());
|
||||
if (shard.indexSettings.isSoftDeleteEnabled()) {
|
||||
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
|
||||
assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations()));
|
||||
assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint)));
|
||||
} else {
|
||||
int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included
|
||||
assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps));
|
||||
assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps));
|
||||
assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs));
|
||||
}
|
||||
} else {
|
||||
assertEquals(0, resyncTask.getSkippedOperations());
|
||||
assertEquals(0, resyncTask.getResyncedOperations());
|
||||
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
|
||||
assertThat(resyncTask.getResyncedOperations(), equalTo(0));
|
||||
assertThat(resyncTask.getTotalOperations(), equalTo(0));
|
||||
}
|
||||
|
||||
closeShards(shard);
|
||||
}
|
||||
|
||||
|
@ -61,7 +61,7 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
|
||||
final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> result =
|
||||
TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations, followerPrimary, logger);
|
||||
|
||||
try (Translog.Snapshot snapshot = followerPrimary.newTranslogSnapshotFromMinSeqNo(0)) {
|
||||
try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", 0)) {
|
||||
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
|
||||
Translog.Operation operation;
|
||||
while ((operation = snapshot.next()) != null) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user