Always use Lucene index in peer recovery (#2077)

With soft deletes no longer optional, peer recovery is switched to always use the
lucene index instead of replaying operations from the translog.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
This commit is contained in:
Nick Knize 2022-02-23 11:05:40 -06:00 committed by GitHub
parent 3e9a420533
commit 44441d8fc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 53 additions and 406 deletions

View File

@ -43,7 +43,6 @@ import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue; import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexService; import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings; import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService; import org.opensearch.indices.IndicesService;
@ -122,7 +121,7 @@ public class RetentionLeaseIT extends OpenSearchIntegTestCase {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = countDownLatchListener(latch); final ActionListener<ReplicationResponse> listener = countDownLatchListener(latch);
// simulate a peer recovery which locks the soft deletes policy on the primary // simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {}; final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock() : () -> {};
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
latch.await(); latch.await();
retentionLock.close(); retentionLock.close();
@ -175,7 +174,7 @@ public class RetentionLeaseIT extends OpenSearchIntegTestCase {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = countDownLatchListener(latch); final ActionListener<ReplicationResponse> listener = countDownLatchListener(latch);
// simulate a peer recovery which locks the soft deletes policy on the primary // simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {}; final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock() : () -> {};
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
latch.await(); latch.await();
retentionLock.close(); retentionLock.close();
@ -186,7 +185,7 @@ public class RetentionLeaseIT extends OpenSearchIntegTestCase {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
primary.removeRetentionLease(id, countDownLatchListener(latch)); primary.removeRetentionLease(id, countDownLatchListener(latch));
// simulate a peer recovery which locks the soft deletes policy on the primary // simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {}; final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock() : () -> {};
currentRetentionLeases.remove(id); currentRetentionLeases.remove(id);
latch.await(); latch.await();
retentionLock.close(); retentionLock.close();

View File

@ -730,7 +730,7 @@ public abstract class Engine implements Closeable {
/** /**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/ */
public abstract Closeable acquireHistoryRetentionLock(HistorySource historySource); public abstract Closeable acquireHistoryRetentionLock();
/** /**
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive). * Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
@ -744,51 +744,7 @@ public abstract class Engine implements Closeable {
boolean requiredFullRange boolean requiredFullRange
) throws IOException; ) throws IOException;
/** public abstract boolean hasCompleteOperationHistory(String reason, long startingSeqNo);
* Creates a new history snapshot from either Lucene/Translog for reading operations whose seqno in the requesting
* seqno range (both inclusive).
*/
public Translog.Snapshot newChangesSnapshot(
String source,
HistorySource historySource,
MapperService mapperService,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange
) throws IOException {
return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange);
}
/**
* Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive).
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public abstract Translog.Snapshot readHistoryOperations(
String reason,
HistorySource historySource,
MapperService mapperService,
long startingSeqNo
) throws IOException;
/**
* Returns the estimated number of history operations whose seq# at least {@code startingSeqNo}(inclusive) in this engine.
*/
public abstract int estimateNumberOfHistoryOperations(
String reason,
HistorySource historySource,
MapperService mapperService,
long startingSeqNo
) throws IOException;
/**
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
*/
public abstract boolean hasCompleteOperationHistory(
String reason,
HistorySource historySource,
MapperService mapperService,
long startingSeqNo
) throws IOException;
/** /**
* Gets the minimum retained sequence number for this engine. * Gets the minimum retained sequence number for this engine.
@ -2029,12 +1985,4 @@ public abstract class Engine implements Closeable {
* to advance this marker to at least the given sequence number. * to advance this marker to at least the given sequence number.
*/ */
public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary); public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary);
/**
* Whether we should read history operations from translog or Lucene index
*/
public enum HistorySource {
TRANSLOG,
INDEX
}
} }

View File

@ -608,45 +608,6 @@ public class InternalEngine extends Engine {
revisitIndexDeletionPolicyOnTranslogSynced(); revisitIndexDeletionPolicyOnTranslogSynced();
} }
/**
* Creates a new history snapshot for reading operations since the provided seqno.
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
@Override
public Translog.Snapshot readHistoryOperations(
String reason,
HistorySource historySource,
MapperService mapperService,
long startingSeqNo
) throws IOException {
if (historySource == HistorySource.INDEX) {
return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
} else {
return getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE);
}
}
/**
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
*/
@Override
public int estimateNumberOfHistoryOperations(
String reason,
HistorySource historySource,
MapperService mapperService,
long startingSeqNo
) throws IOException {
if (historySource == HistorySource.INDEX) {
try (
Translog.Snapshot snapshot = newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false)
) {
return snapshot.totalOperations();
}
} else {
return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
}
}
@Override @Override
public TranslogStats getTranslogStats() { public TranslogStats getTranslogStats() {
return getTranslog().stats(); return getTranslog().stats();
@ -2815,22 +2776,6 @@ public class InternalEngine extends Engine {
return numDocUpdates.count(); return numDocUpdates.count();
} }
@Override
public Translog.Snapshot newChangesSnapshot(
String source,
HistorySource historySource,
MapperService mapperService,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange
) throws IOException {
if (historySource == HistorySource.INDEX) {
return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange);
} else {
return getTranslog().newSnapshot(fromSeqNo, toSeqNo, requiredFullRange);
}
}
@Override @Override
public Translog.Snapshot newChangesSnapshot( public Translog.Snapshot newChangesSnapshot(
String source, String source,
@ -2865,28 +2810,8 @@ public class InternalEngine extends Engine {
} }
} }
@Override public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
public boolean hasCompleteOperationHistory(String reason, HistorySource historySource, MapperService mapperService, long startingSeqNo) return getMinRetainedSeqNo() <= startingSeqNo;
throws IOException {
if (historySource == HistorySource.INDEX) {
return getMinRetainedSeqNo() <= startingSeqNo;
} else {
final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
// avoid scanning translog if not necessary
if (startingSeqNo > currentLocalCheckpoint) {
return true;
}
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
tracker.markSeqNoAsProcessed(operation.seqNo());
}
}
}
return tracker.getProcessedCheckpoint() >= currentLocalCheckpoint;
}
} }
/** /**
@ -2897,13 +2822,8 @@ public class InternalEngine extends Engine {
return softDeletesPolicy.getMinRetainedSeqNo(); return softDeletesPolicy.getMinRetainedSeqNo();
} }
@Override public Closeable acquireHistoryRetentionLock() {
public Closeable acquireHistoryRetentionLock(HistorySource historySource) { return softDeletesPolicy.acquireRetentionLock();
if (historySource == HistorySource.INDEX) {
return softDeletesPolicy.acquireRetentionLock();
} else {
return translog.acquireRetentionLock();
}
} }
/** /**

View File

@ -320,7 +320,7 @@ public class ReadOnlyEngine extends Engine {
public void syncTranslog() {} public void syncTranslog() {}
@Override @Override
public Closeable acquireHistoryRetentionLock(HistorySource historySource) { public Closeable acquireHistoryRetentionLock() {
return () -> {}; return () -> {};
} }
@ -335,33 +335,7 @@ public class ReadOnlyEngine extends Engine {
return newEmptySnapshot(); return newEmptySnapshot();
} }
@Override public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
public Translog.Snapshot readHistoryOperations(
String reason,
HistorySource historySource,
MapperService mapperService,
long startingSeqNo
) {
return newEmptySnapshot();
}
@Override
public int estimateNumberOfHistoryOperations(
String reason,
HistorySource historySource,
MapperService mapperService,
long startingSeqNo
) {
return 0;
}
@Override
public boolean hasCompleteOperationHistory(
String reason,
HistorySource historySource,
MapperService mapperService,
long startingSeqNo
) {
// we can do operation-based recovery if we don't have to replay any operation. // we can do operation-based recovery if we don't have to replay any operation.
return startingSeqNo > seqNoStats.getMaxSeqNo(); return startingSeqNo > seqNoStats.getMaxSeqNo();
} }

View File

@ -2304,23 +2304,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/** /**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/ */
public Closeable acquireHistoryRetentionLock(Engine.HistorySource source) { public Closeable acquireHistoryRetentionLock() {
return getEngine().acquireHistoryRetentionLock(source); return getEngine().acquireHistoryRetentionLock();
}
/**
* Returns the estimated number of history operations whose seq# at least the provided seq# in this shard.
*/
public int estimateNumberOfHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException {
return getEngine().estimateNumberOfHistoryOperations(reason, source, mapperService, startingSeqNo);
}
/**
* Creates a new history snapshot for reading operations since the provided starting seqno (inclusive).
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException {
return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo);
} }
/** /**
@ -2329,17 +2314,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* the provided starting seqno (inclusive) and ending seqno (inclusive) * the provided starting seqno (inclusive) and ending seqno (inclusive)
* The returned snapshot can be retrieved from either Lucene index or translog files. * The returned snapshot can be retrieved from either Lucene index or translog files.
*/ */
public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo, long endSeqNo) public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo) throws IOException {
throws IOException { return getEngine().newChangesSnapshot(reason, mapperService, startingSeqNo, endSeqNo, true);
return getEngine().newChangesSnapshot(reason, source, mapperService, startingSeqNo, endSeqNo, true);
} }
/** /**
* Checks if we have a completed history of operations since the given starting seqno (inclusive). * Checks if we have a completed history of operations since the given starting seqno (inclusive).
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)} * This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()}
*/ */
public boolean hasCompleteHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException { public boolean hasCompleteHistoryOperations(String reason, long startingSeqNo) {
return getEngine().hasCompleteOperationHistory(reason, source, mapperService, startingSeqNo); return getEngine().hasCompleteOperationHistory(reason, startingSeqNo);
} }
/** /**
@ -2529,7 +2513,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert assertPrimaryMode(); assert assertPrimaryMode();
verifyNotClosed(); verifyNotClosed();
ensureSoftDeletesEnabled("retention leases"); ensureSoftDeletesEnabled("retention leases");
try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { try (Closeable ignore = acquireHistoryRetentionLock()) {
final long actualRetainingSequenceNumber = retainingSequenceNumber == RETAIN_ALL final long actualRetainingSequenceNumber = retainingSequenceNumber == RETAIN_ALL
? getMinRetainedSeqNo() ? getMinRetainedSeqNo()
: retainingSequenceNumber; : retainingSequenceNumber;
@ -2552,7 +2536,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert assertPrimaryMode(); assert assertPrimaryMode();
verifyNotClosed(); verifyNotClosed();
ensureSoftDeletesEnabled("retention leases"); ensureSoftDeletesEnabled("retention leases");
try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { try (Closeable ignore = acquireHistoryRetentionLock()) {
final long actualRetainingSequenceNumber = retainingSequenceNumber == RETAIN_ALL final long actualRetainingSequenceNumber = retainingSequenceNumber == RETAIN_ALL
? getMinRetainedSeqNo() ? getMinRetainedSeqNo()
: retainingSequenceNumber; : retainingSequenceNumber;

View File

@ -49,7 +49,6 @@ import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.core.internal.io.IOUtils; import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.Translog;
import org.opensearch.tasks.Task; import org.opensearch.tasks.Task;
@ -99,16 +98,13 @@ public class PrimaryReplicaSyncer {
Translog.Snapshot snapshot = null; Translog.Snapshot snapshot = null;
try { try {
final long startingSeqNo = indexShard.getLastKnownGlobalCheckpoint() + 1; final long startingSeqNo = indexShard.getLastKnownGlobalCheckpoint() + 1;
assert startingSeqNo >= 0 : "startingSeqNo must be non-negative; got [" + startingSeqNo + "]";
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
final ShardId shardId = indexShard.shardId(); final ShardId shardId = indexShard.shardId();
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
// Also fail the resync early if the shard is shutting down // Also fail the resync early if the shard is shutting down
snapshot = indexShard.getHistoryOperations( snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false);
"resync",
indexShard.indexSettings.isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG,
startingSeqNo
);
final Translog.Snapshot originalSnapshot = snapshot; final Translog.Snapshot originalSnapshot = snapshot;
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
@Override @Override

View File

@ -132,6 +132,7 @@ public class RecoverySourceHandler {
private final CancellableThreads cancellableThreads = new CancellableThreads(); private final CancellableThreads cancellableThreads = new CancellableThreads();
private final List<Closeable> resources = new CopyOnWriteArrayList<>(); private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final ListenableFuture<RecoveryResponse> future = new ListenableFuture<>(); private final ListenableFuture<RecoveryResponse> future = new ListenableFuture<>();
private static final String PEER_RECOVERY_NAME = "peer-recovery";
public RecoverySourceHandler( public RecoverySourceHandler(
IndexShard shard, IndexShard shard,
@ -187,7 +188,6 @@ public class RecoverySourceHandler {
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
}; };
final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled();
final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>(); final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();
runUnderPrimaryPermit(() -> { runUnderPrimaryPermit(() -> {
@ -211,19 +211,13 @@ public class RecoverySourceHandler {
cancellableThreads, cancellableThreads,
logger logger
); );
final Engine.HistorySource historySource; final Closeable retentionLock = shard.acquireHistoryRetentionLock();
if (softDeletesEnabled && (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null)) {
historySource = Engine.HistorySource.INDEX;
} else {
historySource = Engine.HistorySource.TRANSLOG;
}
final Closeable retentionLock = shard.acquireHistoryRetentionLock(historySource);
resources.add(retentionLock); resources.add(retentionLock);
final long startingSeqNo; final long startingSeqNo;
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
&& isTargetSameHistory() && isTargetSameHistory()
&& shard.hasCompleteHistoryOperations("peer-recovery", historySource, request.startingSeqNo()) && shard.hasCompleteHistoryOperations(PEER_RECOVERY_NAME, request.startingSeqNo())
&& (historySource == Engine.HistorySource.TRANSLOG && ((retentionLeaseRef.get() == null && shard.useRetentionLeasesInPeerRecovery() == false)
|| (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));
// NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease,
// because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's
@ -231,7 +225,7 @@ public class RecoverySourceHandler {
// Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery
// without having a complete history. // without having a complete history.
if (isSequenceNumberBasedRecovery && softDeletesEnabled && retentionLeaseRef.get() != null) { if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) {
// all the history we need is retained by an existing retention lease, so we do not need a separate retention lock // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock
retentionLock.close(); retentionLock.close();
logger.trace("history is retained by {}", retentionLeaseRef.get()); logger.trace("history is retained by {}", retentionLeaseRef.get());
@ -274,13 +268,11 @@ public class RecoverySourceHandler {
// advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can
// always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled
// down. // down.
startingSeqNo = softDeletesEnabled startingSeqNo = Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L;
? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L
: 0;
logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);
try { try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo); final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo);
final Releasable releaseStore = acquireStore(shard.store()); final Releasable releaseStore = acquireStore(shard.store());
resources.add(releaseStore); resources.add(releaseStore);
sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> { sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
@ -327,10 +319,7 @@ public class RecoverySourceHandler {
sendFileStep.whenComplete(r -> { sendFileStep.whenComplete(r -> {
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
// For a sequence based recovery, the target can keep its local translog // For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog( prepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);
shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo),
prepareEngineStep
);
}, onFailure); }, onFailure);
prepareEngineStep.whenComplete(prepareEngineTime -> { prepareEngineStep.whenComplete(prepareEngineTime -> {
@ -350,11 +339,10 @@ public class RecoverySourceHandler {
); );
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
logger.trace( if (logger.isTraceEnabled()) {
"snapshot translog for recovery; current size is [{}]", logger.trace("snapshot translog for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo));
shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo) }
); final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false);
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo);
resources.add(phase2Snapshot); resources.add(phase2Snapshot);
retentionLock.close(); retentionLock.close();
@ -415,6 +403,12 @@ public class RecoverySourceHandler {
return targetHistoryUUID.equals(shard.getHistoryUUID()); return targetHistoryUUID.equals(shard.getHistoryUUID());
} }
private int estimateNumberOfHistoryOperations(long startingSeqNo) throws IOException {
try (Translog.Snapshot snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false)) {
return snapshot.totalOperations();
}
}
static void runUnderPrimaryPermit( static void runUnderPrimaryPermit(
CancellableThreads.Interruptible runnable, CancellableThreads.Interruptible runnable,
String reason, String reason,

View File

@ -344,11 +344,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private boolean hasUncommittedOperations() throws IOException { private boolean hasUncommittedOperations() throws IOException {
long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
return indexShard.estimateNumberOfHistoryOperations( try (
"peer-recovery", Translog.Snapshot snapshot = indexShard.newChangesSnapshot("peer-recovery", localCheckpointOfCommit + 1, Long.MAX_VALUE, false)
indexShard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, ) {
localCheckpointOfCommit + 1 return snapshot.totalOperations() > 0;
) > 0; }
} }
@Override @Override

View File

@ -6163,10 +6163,8 @@ public class InternalEngineTests extends EngineTestCase {
} }
} }
MapperService mapperService = createMapperService("test"); MapperService mapperService = createMapperService("test");
List<Translog.Operation> luceneOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.INDEX, mapperService); List<Translog.Operation> luceneOps = readAllOperationsBasedOnSource(engine, mapperService);
List<Translog.Operation> translogOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.TRANSLOG, mapperService);
assertThat(luceneOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray())); assertThat(luceneOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
assertThat(translogOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
} }
} }
@ -6317,7 +6315,7 @@ public class InternalEngineTests extends EngineTestCase {
if (rarely()) { if (rarely()) {
engine.forceMerge(randomBoolean(), 1, false, false, false, UUIDs.randomBase64UUID()); engine.forceMerge(randomBoolean(), 1, false, false, false, UUIDs.randomBase64UUID());
} }
try (Closeable ignored = engine.acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { try (Closeable ignored = engine.acquireHistoryRetentionLock()) {
long minRetainSeqNos = engine.getMinRetainedSeqNo(); long minRetainSeqNos = engine.getMinRetainedSeqNo();
assertThat(minRetainSeqNos, lessThanOrEqualTo(globalCheckpoint.get() + 1)); assertThat(minRetainSeqNos, lessThanOrEqualTo(globalCheckpoint.get() + 1));
Long[] expectedOps = existingSeqNos.stream().filter(seqno -> seqno >= minRetainSeqNos).toArray(Long[]::new); Long[] expectedOps = existingSeqNos.stream().filter(seqno -> seqno >= minRetainSeqNos).toArray(Long[]::new);

View File

@ -508,18 +508,12 @@ public class IndexLevelReplicationTests extends OpenSearchIndexLevelReplicationT
assertThat(snapshot.totalOperations(), equalTo(0)); assertThat(snapshot.totalOperations(), equalTo(0));
} }
} }
try ( try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
Translog.Snapshot snapshot = shard.getHistoryOperations(
"test",
shard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG,
0
)
) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
} }
} }
// the failure replicated directly from the replication channel. // the failure replicated directly from the replication channel.
indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON)); indexResp = shards.index(new IndexRequest(index.getName()).id("any").source("{}", XContentType.JSON));
assertThat(indexResp.getFailure().getCause(), equalTo(indexException)); assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
Translog.NoOp noop2 = new Translog.NoOp(1, primaryTerm, indexException.toString()); Translog.NoOp noop2 = new Translog.NoOp(1, primaryTerm, indexException.toString());
expectedTranslogOps.add(noop2); expectedTranslogOps.add(noop2);
@ -532,13 +526,7 @@ public class IndexLevelReplicationTests extends OpenSearchIndexLevelReplicationT
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(Collections.singletonList(noop2))); assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(Collections.singletonList(noop2)));
} }
} }
try ( try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
Translog.Snapshot snapshot = shard.getHistoryOperations(
"test",
shard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG,
0
)
) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
} }
} }

View File

@ -53,34 +53,23 @@ import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.VersionType; import org.opensearch.index.VersionType;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.SourceToParse; import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.TestTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.tasks.Task; import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskManager;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class PrimaryReplicaSyncerTests extends IndexShardTestCase { public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
@ -238,41 +227,6 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
} }
} }
public void testDoNotSendOperationsWithoutSequenceNumber() throws Exception {
IndexShard shard = spy(newStartedShard(true));
when(shard.getLastKnownGlobalCheckpoint()).thenReturn(SequenceNumbers.UNASSIGNED_SEQ_NO);
int numOps = between(0, 20);
List<Translog.Operation> operations = new ArrayList<>();
for (int i = 0; i < numOps; i++) {
operations.add(
new Translog.Index(
"_doc",
Integer.toString(i),
randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : i,
primaryTerm,
new byte[] { 1 }
)
);
}
Engine.HistorySource source = shard.indexSettings.isSoftDeleteEnabled()
? Engine.HistorySource.INDEX
: Engine.HistorySource.TRANSLOG;
doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), eq(source), anyLong());
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
List<Translog.Operation> sentOperations = new ArrayList<>();
PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> {
sentOperations.addAll(Arrays.asList(request.getOperations()));
listener.onResponse(new ResyncReplicationResponse());
};
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction);
syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10)));
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
syncer.resync(shard, fut);
fut.actionGet();
assertThat(sentOperations, equalTo(operations.stream().filter(op -> op.seqNo() >= 0).collect(Collectors.toList())));
closeShards(shard);
}
public void testStatusSerialization() throws IOException { public void testStatusSerialization() throws IOException {
PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status( PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(
randomAlphaOfLength(10), randomAlphaOfLength(10),

View File

@ -41,7 +41,6 @@ import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper; import org.opensearch.ExceptionsHelper;
import org.opensearch.Version;
import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.bulk.BulkShardRequest; import org.opensearch.action.bulk.BulkShardRequest;
@ -69,7 +68,6 @@ import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store; import org.opensearch.index.store.Store;
import org.opensearch.index.translog.SnapshotMatchers; import org.opensearch.index.translog.SnapshotMatchers;
import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.Translog;
import org.opensearch.test.VersionUtils;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
@ -146,108 +144,6 @@ public class RecoveryTests extends OpenSearchIndexLevelReplicationTestCase {
} }
} }
public void testRecoveryWithOutOfOrderDeleteWithTranslog() throws Exception {
/*
* The flow of this test:
* - delete #1
* - roll generation (to create gen 2)
* - index #0
* - index #3
* - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained)
* - index #2
* - index #5
* - If flush and the translog retention disabled, delete #1 will be removed while index #0 is still retained and replayed.
*/
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomPreviousCompatibleVersion(random(), Version.V_2_0_0))
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false)
.build();
try (ReplicationGroup shards = createGroup(1, settings)) {
shards.startAll();
// create out of order delete and index op on replica
final IndexShard orgReplica = shards.getReplicas().get(0);
final String indexName = orgReplica.shardId().getIndexName();
final long primaryTerm = orgReplica.getOperationPrimaryTerm();
// delete #1
orgReplica.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete
orgReplica.applyDeleteOperationOnReplica(1, primaryTerm, 2, "type", "id");
getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation
// index #0
orgReplica.applyIndexOperationOnReplica(
0,
primaryTerm,
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON)
);
// index #3
orgReplica.applyIndexOperationOnReplica(
3,
primaryTerm,
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON)
);
// Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1.
orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true));
// index #2
orgReplica.applyIndexOperationOnReplica(
2,
primaryTerm,
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON)
);
orgReplica.sync(); // advance local checkpoint
orgReplica.updateGlobalCheckpointOnReplica(3L, "test");
// index #5 -> force NoOp #4.
orgReplica.applyIndexOperationOnReplica(
5,
primaryTerm,
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON)
);
final int translogOps;
if (randomBoolean()) {
if (randomBoolean()) {
logger.info("--> flushing shard (translog will be trimmed)");
IndexMetadata.Builder builder = IndexMetadata.builder(orgReplica.indexSettings().getIndexMetadata());
builder.settings(
Settings.builder()
.put(orgReplica.indexSettings().getSettings())
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
);
orgReplica.indexSettings().updateIndexMetadata(builder.build());
orgReplica.onSettingsChanged();
translogOps = 5; // 4 ops + seqno gaps (delete #1 is removed but index #0 will be replayed).
} else {
logger.info("--> flushing shard (translog will be retained)");
translogOps = 6; // 5 ops + seqno gaps
}
flushShard(orgReplica);
} else {
translogOps = 6; // 5 ops + seqno gaps
}
final IndexShard orgPrimary = shards.getPrimary();
shards.promoteReplicaToPrimary(orgReplica).get(); // wait for primary/replica sync to make sure seq# gap is closed.
IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId());
shards.recoverReplica(newReplica);
shards.assertAllEqual(3);
assertThat(getTranslog(newReplica).totalOperations(), equalTo(translogOps));
}
}
public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
Settings settings = Settings.builder() Settings settings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
@ -329,7 +225,7 @@ public class RecoveryTests extends OpenSearchIndexLevelReplicationTestCase {
IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId()); IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId());
shards.recoverReplica(newReplica); shards.recoverReplica(newReplica);
shards.assertAllEqual(3); shards.assertAllEqual(3);
try (Translog.Snapshot snapshot = newReplica.getHistoryOperations("test", Engine.HistorySource.INDEX, 0)) { try (Translog.Snapshot snapshot = newReplica.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
assertThat(snapshot, SnapshotMatchers.size(6)); assertThat(snapshot, SnapshotMatchers.size(6));
} }
} }

View File

@ -1350,13 +1350,9 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
/** /**
* Reads all engine operations that have been processed by the engine from Lucene index/Translog based on source. * Reads all engine operations that have been processed by the engine from Lucene index/Translog based on source.
*/ */
public static List<Translog.Operation> readAllOperationsBasedOnSource( public static List<Translog.Operation> readAllOperationsBasedOnSource(Engine engine, MapperService mapper) throws IOException {
Engine engine,
Engine.HistorySource historySource,
MapperService mapper
) throws IOException {
final List<Translog.Operation> operations = new ArrayList<>(); final List<Translog.Operation> operations = new ArrayList<>();
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", historySource, mapper, 0, Long.MAX_VALUE, false)) { try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapper, 0, Long.MAX_VALUE, false)) {
Translog.Operation op; Translog.Operation op;
while ((op = snapshot.next()) != null) { while ((op = snapshot.next()) != null) {
operations.add(op); operations.add(op);