Recover retention leases during peer recovery (#38435)
This commit integrates retention leases with recovery. With this change, we copy the current retention leases on primary to the replica during phase two of recovery. At this point, the replica will have been added to the replication group and so is already receiving retention lease sync requests from the primary. This means that if any retention lease syncs are triggered on the primary after we sample the retention leases here during phase two, that sync request will also arrive on the replica ensuring that the replica is from this point on up to date with the retention leases on the primary. We have to copy these during phase two since we will be applying indexing operations, potentially triggering merges, and therefore must ensure the correct retention leases are in place beforehand.
This commit is contained in:
parent
fb0ec26fd4
commit
79a45b47da
|
@ -132,6 +132,7 @@ import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
|||
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
|
||||
import org.elasticsearch.indices.recovery.RecoveryFailedException;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -3082,7 +3083,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
|
||||
*
|
||||
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)
|
||||
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long, ActionListener)
|
||||
* @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, ActionListener)
|
||||
*/
|
||||
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
|
||||
assert seqNo != UNASSIGNED_SEQ_NO
|
||||
|
|
|
@ -518,17 +518,21 @@ public class PeerRecoveryTargetService implements IndexEventListener {
|
|||
}
|
||||
});
|
||||
};
|
||||
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
|
||||
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
|
||||
ActionListener.wrap(
|
||||
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
|
||||
e -> {
|
||||
if (e instanceof MapperException) {
|
||||
retryOnMappingException.accept(e);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
})
|
||||
recoveryTarget.indexTranslogOperations(
|
||||
request.operations(),
|
||||
request.totalTranslogOps(),
|
||||
request.maxSeenAutoIdTimestampOnPrimary(),
|
||||
request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
|
||||
request.retentionLeases(),
|
||||
ActionListener.wrap(
|
||||
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
|
||||
e -> {
|
||||
if (e instanceof MapperException) {
|
||||
retryOnMappingException.accept(e);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
|
|||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.RecoveryEngineException;
|
||||
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
|
||||
import org.elasticsearch.index.seqno.RetentionLeases;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardClosedException;
|
||||
|
@ -231,8 +232,16 @@ public class RecoverySourceHandler {
|
|||
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
|
||||
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
|
||||
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
|
||||
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
|
||||
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
|
||||
final RetentionLeases retentionLeases = shard.getRetentionLeases();
|
||||
phase2(
|
||||
startingSeqNo,
|
||||
requiredSeqNoRangeStart,
|
||||
endingSeqNo,
|
||||
phase2Snapshot,
|
||||
maxSeenAutoIdTimestamp,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
retentionLeases,
|
||||
sendSnapshotStep);
|
||||
sendSnapshotStep.whenComplete(
|
||||
r -> IOUtils.close(phase2Snapshot),
|
||||
e -> {
|
||||
|
@ -517,8 +526,15 @@ public class RecoverySourceHandler {
|
|||
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.
|
||||
* @param listener a listener which will be notified with the local checkpoint on the target.
|
||||
*/
|
||||
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
|
||||
long maxSeqNoOfUpdatesOrDeletes, ActionListener<SendSnapshotResult> listener) throws IOException {
|
||||
void phase2(
|
||||
final long startingSeqNo,
|
||||
final long requiredSeqNoRangeStart,
|
||||
final long endingSeqNo,
|
||||
final Translog.Snapshot snapshot,
|
||||
final long maxSeenAutoIdTimestamp,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final RetentionLeases retentionLeases,
|
||||
final ActionListener<SendSnapshotResult> listener) throws IOException {
|
||||
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
|
||||
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
|
||||
assert startingSeqNo <= requiredSeqNoRangeStart :
|
||||
|
@ -584,25 +600,50 @@ public class RecoverySourceHandler {
|
|||
listener::onFailure
|
||||
);
|
||||
|
||||
sendBatch(readNextBatch, true, SequenceNumbers.UNASSIGNED_SEQ_NO, snapshot.totalOperations(),
|
||||
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, batchedListener);
|
||||
sendBatch(
|
||||
readNextBatch,
|
||||
true,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
snapshot.totalOperations(),
|
||||
maxSeenAutoIdTimestamp,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
retentionLeases,
|
||||
batchedListener);
|
||||
}
|
||||
|
||||
private void sendBatch(CheckedSupplier<List<Translog.Operation>, IOException> nextBatch, boolean firstBatch,
|
||||
long targetLocalCheckpoint, int totalTranslogOps, long maxSeenAutoIdTimestamp,
|
||||
long maxSeqNoOfUpdatesOrDeletes, ActionListener<Long> listener) throws IOException {
|
||||
private void sendBatch(
|
||||
final CheckedSupplier<List<Translog.Operation>, IOException> nextBatch,
|
||||
final boolean firstBatch,
|
||||
final long targetLocalCheckpoint,
|
||||
final int totalTranslogOps,
|
||||
final long maxSeenAutoIdTimestamp,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final RetentionLeases retentionLeases,
|
||||
final ActionListener<Long> listener) throws IOException {
|
||||
final List<Translog.Operation> operations = nextBatch.get();
|
||||
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
|
||||
if (operations.isEmpty() == false || firstBatch) {
|
||||
cancellableThreads.execute(() -> {
|
||||
recoveryTarget.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
|
||||
ActionListener.wrap(
|
||||
newCheckpoint -> {
|
||||
sendBatch(nextBatch, false, SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
|
||||
totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener);
|
||||
},
|
||||
listener::onFailure
|
||||
));
|
||||
recoveryTarget.indexTranslogOperations(
|
||||
operations,
|
||||
totalTranslogOps,
|
||||
maxSeenAutoIdTimestamp,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
retentionLeases,
|
||||
ActionListener.wrap(
|
||||
newCheckpoint -> {
|
||||
sendBatch(
|
||||
nextBatch,
|
||||
false,
|
||||
SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
|
||||
totalTranslogOps,
|
||||
maxSeenAutoIdTimestamp,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
retentionLeases,
|
||||
listener);
|
||||
},
|
||||
listener::onFailure
|
||||
));
|
||||
});
|
||||
} else {
|
||||
listener.onResponse(targetLocalCheckpoint);
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.mapper.MapperException;
|
||||
import org.elasticsearch.index.seqno.ReplicationTracker;
|
||||
import org.elasticsearch.index.seqno.RetentionLeases;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
|
||||
|
@ -397,8 +398,13 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
|
|||
}
|
||||
|
||||
@Override
|
||||
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
|
||||
long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener<Long> listener) {
|
||||
public void indexTranslogOperations(
|
||||
final List<Translog.Operation> operations,
|
||||
final int totalTranslogOps,
|
||||
final long maxSeenAutoIdTimestampOnPrimary,
|
||||
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
|
||||
final RetentionLeases retentionLeases,
|
||||
final ActionListener<Long> listener) {
|
||||
ActionListener.completeWith(listener, () -> {
|
||||
final RecoveryState.Translog translog = state().getTranslog();
|
||||
translog.totalOperations(totalTranslogOps);
|
||||
|
@ -418,6 +424,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
|
|||
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that op was executed on.
|
||||
*/
|
||||
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
|
||||
/*
|
||||
* We have to update the retention leases before we start applying translog operations to ensure we are retaining according to
|
||||
* the policy.
|
||||
*/
|
||||
indexShard().updateRetentionLeasesOnReplica(retentionLeases);
|
||||
for (Translog.Operation operation : operations) {
|
||||
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
|
||||
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.indices.recovery;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.index.seqno.ReplicationTracker;
|
||||
import org.elasticsearch.index.seqno.RetentionLeases;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
@ -32,14 +33,15 @@ public interface RecoveryTargetHandler {
|
|||
|
||||
/**
|
||||
* Prepares the target to receive translog operations, after all file have been copied
|
||||
* @param fileBasedRecovery whether or not this call is part of an file based recovery
|
||||
* @param totalTranslogOps total translog operations expected to be sent
|
||||
*
|
||||
* @param fileBasedRecovery whether or not this call is part of an file based recovery
|
||||
* @param totalTranslogOps total translog operations expected to be sent
|
||||
*/
|
||||
void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener);
|
||||
|
||||
/**
|
||||
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
|
||||
* updates the global checkpoint.
|
||||
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, updates
|
||||
* the global checkpoint.
|
||||
*
|
||||
* @param globalCheckpoint the global checkpoint on the recovery source
|
||||
* @param listener the listener which will be notified when this method is completed
|
||||
|
@ -67,11 +69,17 @@ public interface RecoveryTargetHandler {
|
|||
* @param maxSeqNoOfUpdatesOrDeletesOnPrimary the max seq_no of update operations (index operations overwrite Lucene) or delete ops on
|
||||
* the primary shard when capturing these operations. This value is at least as high as the
|
||||
* max_seq_no_of_updates on the primary was when any of these ops were processed on it.
|
||||
* @param retentionLeases the retention leases on the primary
|
||||
* @param listener a listener which will be notified with the local checkpoint on the target
|
||||
* after these operations are successfully indexed on the target.
|
||||
*/
|
||||
void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
|
||||
long maxSeqNoOfUpdatesOrDeletesOnPrimary, ActionListener<Long> listener);
|
||||
void indexTranslogOperations(
|
||||
List<Translog.Operation> operations,
|
||||
int totalTranslogOps,
|
||||
long maxSeenAutoIdTimestampOnPrimary,
|
||||
long maxSeqNoOfUpdatesOrDeletesOnPrimary,
|
||||
RetentionLeases retentionLeases,
|
||||
ActionListener<Long> listener);
|
||||
|
||||
/**
|
||||
* Notifies the target of the files it is going to receive
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.index.seqno.RetentionLeases;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
@ -39,18 +40,26 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
|
|||
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
|
||||
private long maxSeenAutoIdTimestampOnPrimary;
|
||||
private long maxSeqNoOfUpdatesOrDeletesOnPrimary;
|
||||
private RetentionLeases retentionLeases;
|
||||
|
||||
public RecoveryTranslogOperationsRequest() {
|
||||
}
|
||||
|
||||
RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations, int totalTranslogOps,
|
||||
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) {
|
||||
RecoveryTranslogOperationsRequest(
|
||||
final long recoveryId,
|
||||
final ShardId shardId,
|
||||
final List<Translog.Operation> operations,
|
||||
final int totalTranslogOps,
|
||||
final long maxSeenAutoIdTimestampOnPrimary,
|
||||
final long maxSeqNoOfUpdatesOrDeletesOnPrimary,
|
||||
final RetentionLeases retentionLeases) {
|
||||
this.recoveryId = recoveryId;
|
||||
this.shardId = shardId;
|
||||
this.operations = operations;
|
||||
this.totalTranslogOps = totalTranslogOps;
|
||||
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
|
||||
this.maxSeqNoOfUpdatesOrDeletesOnPrimary = maxSeqNoOfUpdatesOrDeletesOnPrimary;
|
||||
this.retentionLeases = retentionLeases;
|
||||
}
|
||||
|
||||
public long recoveryId() {
|
||||
|
@ -77,6 +86,10 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
|
|||
return maxSeqNoOfUpdatesOrDeletesOnPrimary;
|
||||
}
|
||||
|
||||
public RetentionLeases retentionLeases() {
|
||||
return retentionLeases;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
|
@ -95,6 +108,11 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
|
|||
// UNASSIGNED_SEQ_NO means uninitialized and replica won't enable optimization using seq_no
|
||||
maxSeqNoOfUpdatesOrDeletesOnPrimary = SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
}
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
retentionLeases = new RetentionLeases(in);
|
||||
} else {
|
||||
retentionLeases = RetentionLeases.EMPTY;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -110,5 +128,8 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
|
|||
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
|
||||
out.writeZLong(maxSeqNoOfUpdatesOrDeletesOnPrimary);
|
||||
}
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
retentionLeases.writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.index.seqno.ReplicationTracker;
|
||||
import org.elasticsearch.index.seqno.RetentionLeases;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
|
@ -113,10 +114,21 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
|
||||
long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener<Long> listener) {
|
||||
public void indexTranslogOperations(
|
||||
final List<Translog.Operation> operations,
|
||||
final int totalTranslogOps,
|
||||
final long maxSeenAutoIdTimestampOnPrimary,
|
||||
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
|
||||
final RetentionLeases retentionLeases,
|
||||
final ActionListener<Long> listener) {
|
||||
final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest(
|
||||
recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary);
|
||||
recoveryId,
|
||||
shardId,
|
||||
operations,
|
||||
totalTranslogOps,
|
||||
maxSeenAutoIdTimestampOnPrimary,
|
||||
maxSeqNoOfDeletesOrUpdatesOnPrimary,
|
||||
retentionLeases);
|
||||
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions,
|
||||
new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(r.localCheckpoint), listener::onFailure),
|
||||
RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC));
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.elasticsearch.index.engine.EngineTestCase;
|
|||
import org.elasticsearch.index.engine.InternalEngineFactory;
|
||||
import org.elasticsearch.index.engine.InternalEngineTests;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.seqno.RetentionLeases;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||
|
@ -502,13 +503,17 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
AtomicBoolean opsSent = new AtomicBoolean(false);
|
||||
final Future<Void> recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> {
|
||||
recoveryStart.countDown();
|
||||
return new RecoveryTarget(indexShard, node, recoveryListener, l -> {
|
||||
}) {
|
||||
return new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) {
|
||||
@Override
|
||||
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
|
||||
long maxSeenAutoIdTimestamp, long msu, ActionListener<Long> listener) {
|
||||
public void indexTranslogOperations(
|
||||
final List<Translog.Operation> operations,
|
||||
final int totalTranslogOps,
|
||||
final long maxSeenAutoIdTimestamp,
|
||||
final long msu,
|
||||
final RetentionLeases retentionLeases,
|
||||
final ActionListener<Long> listener) {
|
||||
opsSent.set(true);
|
||||
super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, msu, listener);
|
||||
super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, msu, retentionLeases, listener);
|
||||
}
|
||||
};
|
||||
});
|
||||
|
@ -575,9 +580,13 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
replica,
|
||||
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) {
|
||||
@Override
|
||||
public void indexTranslogOperations(final List<Translog.Operation> operations, final int totalTranslogOps,
|
||||
final long maxAutoIdTimestamp, long maxSeqNoOfUpdates,
|
||||
ActionListener<Long> listener) {
|
||||
public void indexTranslogOperations(
|
||||
final List<Translog.Operation> operations,
|
||||
final int totalTranslogOps,
|
||||
final long maxAutoIdTimestamp,
|
||||
final long maxSeqNoOfUpdates,
|
||||
final RetentionLeases retentionLeases,
|
||||
final ActionListener<Long> listener) {
|
||||
// index a doc which is not part of the snapshot, but also does not complete on replica
|
||||
replicaEngineFactory.latchIndexers(1);
|
||||
threadPool.generic().submit(() -> {
|
||||
|
@ -604,7 +613,13 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, listener);
|
||||
super.indexTranslogOperations(
|
||||
operations,
|
||||
totalTranslogOps,
|
||||
maxAutoIdTimestamp,
|
||||
maxSeqNoOfUpdates,
|
||||
retentionLeases,
|
||||
listener);
|
||||
}
|
||||
});
|
||||
pendingDocActiveWithExtraDocIndexed.await();
|
||||
|
@ -846,12 +861,17 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
}
|
||||
|
||||
@Override
|
||||
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
|
||||
long maxAutoIdTimestamp, long maxSeqNoOfUpdates, ActionListener<Long> listener) {
|
||||
public void indexTranslogOperations(
|
||||
final List<Translog.Operation> operations,
|
||||
final int totalTranslogOps,
|
||||
final long maxAutoIdTimestamp,
|
||||
final long maxSeqNoOfUpdates,
|
||||
final RetentionLeases retentionLeases,
|
||||
final ActionListener<Long> listener) {
|
||||
if (hasBlocked() == false) {
|
||||
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
|
||||
}
|
||||
super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, listener);
|
||||
super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, retentionLeases, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -95,7 +95,7 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase {
|
|||
final String source = randomAlphaOfLength(8);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
|
||||
// simulate a peer-recovery which locks the soft-deletes policy on the primary.
|
||||
// simulate a peer recovery which locks the soft deletes policy on the primary
|
||||
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
|
||||
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
|
||||
latch.await();
|
||||
|
@ -205,4 +205,67 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testRetentionLeasesSyncOnRecovery() throws Exception {
|
||||
final int numberOfReplicas = 1;
|
||||
/*
|
||||
* We effectively disable the background sync to ensure that the retention leases are not synced in the background so that the only
|
||||
* source of retention leases on the replicas would be from the commit point and recovery.
|
||||
*/
|
||||
final Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueHours(24))
|
||||
.build();
|
||||
createIndex("index", settings);
|
||||
ensureYellow("index");
|
||||
// exclude the replicas from being allocated
|
||||
allowNodes("index", 1);
|
||||
final AcknowledgedResponse response = client().admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("index").setSettings(Settings.builder().put("index.number_of_replicas", numberOfReplicas).build())
|
||||
.get();
|
||||
assertTrue(response.isAcknowledged());
|
||||
final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
|
||||
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
|
||||
final IndexShard primary = internalCluster()
|
||||
.getInstance(IndicesService.class, primaryShardNodeName)
|
||||
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
|
||||
final int length = randomIntBetween(1, 8);
|
||||
final Map<String, RetentionLease> currentRetentionLeases = new HashMap<>();
|
||||
for (int i = 0; i < length; i++) {
|
||||
final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8));
|
||||
final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
|
||||
final String source = randomAlphaOfLength(8);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
|
||||
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
|
||||
latch.await();
|
||||
/*
|
||||
* Now renew the leases; since we do not flush immediately on renewal, this means that the latest retention leases will not be
|
||||
* in the latest commit point and therefore not transferred during the file-copy phase of recovery.
|
||||
*/
|
||||
currentRetentionLeases.put(id, primary.renewRetentionLease(id, retainingSequenceNumber, source));
|
||||
}
|
||||
|
||||
// now allow the replicas to be allocated and wait for recovery to finalize
|
||||
allowNodes("index", 1 + numberOfReplicas);
|
||||
ensureGreen("index");
|
||||
|
||||
// check current retention leases have been synced to all replicas
|
||||
for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
|
||||
final String replicaShardNodeId = replicaShard.currentNodeId();
|
||||
final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName();
|
||||
final IndexShard replica = internalCluster()
|
||||
.getInstance(IndicesService.class, replicaShardNodeName)
|
||||
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
|
||||
final Map<String, RetentionLease> retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases());
|
||||
assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases));
|
||||
|
||||
// check retention leases have been committed on the replica
|
||||
final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
|
||||
replica.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES));
|
||||
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases)));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -101,6 +101,7 @@ import org.elasticsearch.index.mapper.SourceFieldMapper;
|
|||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.mapper.VersionFieldMapper;
|
||||
import org.elasticsearch.index.seqno.RetentionLeases;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
|
@ -2426,10 +2427,20 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
|
||||
}) {
|
||||
@Override
|
||||
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestamp,
|
||||
long maxSeqNoOfUpdatesOrDeletes, ActionListener<Long> listener){
|
||||
super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
|
||||
ActionListener.runAfter(listener, () -> assertFalse(replica.isSyncNeeded())));
|
||||
public void indexTranslogOperations(
|
||||
final List<Translog.Operation> operations,
|
||||
final int totalTranslogOps,
|
||||
final long maxSeenAutoIdTimestamp,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final RetentionLeases retentionLeases,
|
||||
final ActionListener<Long> listener){
|
||||
super.indexTranslogOperations(
|
||||
operations,
|
||||
totalTranslogOps,
|
||||
maxSeenAutoIdTimestamp,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
retentionLeases,
|
||||
ActionListener.runAfter(listener, () -> assertFalse(replica.isSyncNeeded())));
|
||||
}
|
||||
}, true, true);
|
||||
|
||||
|
@ -2533,14 +2544,26 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
|
||||
}) {
|
||||
@Override
|
||||
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxAutoIdTimestamp,
|
||||
long maxSeqNoOfUpdatesOrDeletes, ActionListener<Long> listener){
|
||||
super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
|
||||
ActionListener.wrap(checkpoint -> {
|
||||
listener.onResponse(checkpoint);
|
||||
// Shard should now be active since we did recover:
|
||||
assertTrue(replica.isActive());
|
||||
}, listener::onFailure));
|
||||
public void indexTranslogOperations(
|
||||
final List<Translog.Operation> operations,
|
||||
final int totalTranslogOps,
|
||||
final long maxAutoIdTimestamp,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final RetentionLeases retentionLeases,
|
||||
final ActionListener<Long> listener){
|
||||
super.indexTranslogOperations(
|
||||
operations,
|
||||
totalTranslogOps,
|
||||
maxAutoIdTimestamp,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
retentionLeases,
|
||||
ActionListener.wrap(
|
||||
checkpoint -> {
|
||||
listener.onResponse(checkpoint);
|
||||
// Shard should now be active since we did recover:
|
||||
assertTrue(replica.isActive());
|
||||
},
|
||||
listener::onFailure));
|
||||
}
|
||||
}, false, true);
|
||||
|
||||
|
@ -2583,13 +2606,25 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxAutoIdTimestamp,
|
||||
long maxSeqNoOfUpdatesOrDeletes, ActionListener<Long> listener) {
|
||||
super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
|
||||
ActionListener.wrap(checkpoint -> {
|
||||
assertListenerCalled.accept(replica);
|
||||
listener.onResponse(checkpoint);
|
||||
}, listener::onFailure));
|
||||
public void indexTranslogOperations(
|
||||
final List<Translog.Operation> operations,
|
||||
final int totalTranslogOps,
|
||||
final long maxAutoIdTimestamp,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final RetentionLeases retentionLeases,
|
||||
final ActionListener<Long> listener) {
|
||||
super.indexTranslogOperations(
|
||||
operations,
|
||||
totalTranslogOps,
|
||||
maxAutoIdTimestamp,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
retentionLeases,
|
||||
ActionListener.wrap(
|
||||
checkpoint -> {
|
||||
assertListenerCalled.accept(replica);
|
||||
listener.onResponse(checkpoint);
|
||||
},
|
||||
listener::onFailure));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.elasticsearch.index.mapper.ParsedDocument;
|
|||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.seqno.ReplicationTracker;
|
||||
import org.elasticsearch.index.seqno.RetentionLeases;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
|
@ -238,7 +239,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
|
||||
@Override
|
||||
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu,
|
||||
ActionListener<Long> listener) {
|
||||
RetentionLeases retentionLeases, ActionListener<Long> listener) {
|
||||
shippedOps.addAll(operations);
|
||||
checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE));
|
||||
maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get()));
|
||||
|
@ -247,7 +248,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
|
||||
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
|
||||
handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
|
||||
randomNonNegativeLong(), randomNonNegativeLong(), future);
|
||||
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
|
||||
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
|
||||
RecoverySourceHandler.SendSnapshotResult result = future.actionGet();
|
||||
assertThat(result.totalOperations, equalTo(expectedOps));
|
||||
|
@ -265,7 +266,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> failedFuture = new PlainActionFuture<>();
|
||||
expectThrows(IllegalStateException.class, () -> {
|
||||
handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, opsToSkip),
|
||||
randomNonNegativeLong(), randomNonNegativeLong(), failedFuture);
|
||||
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, failedFuture);
|
||||
failedFuture.actionGet();
|
||||
});
|
||||
}
|
||||
|
@ -285,7 +286,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
|
||||
@Override
|
||||
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp,
|
||||
long msu, ActionListener<Long> listener) {
|
||||
long msu, RetentionLeases retentionLeases, ActionListener<Long> listener) {
|
||||
if (randomBoolean()) {
|
||||
maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED));
|
||||
} else {
|
||||
|
@ -299,7 +300,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
|
||||
final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
|
||||
handler.phase2(startingSeqNo, startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()),
|
||||
randomNonNegativeLong(), randomNonNegativeLong(), future);
|
||||
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
|
||||
if (wasFailed.get()) {
|
||||
assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index"));
|
||||
}
|
||||
|
@ -498,11 +499,11 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
|
||||
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes,
|
||||
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases,
|
||||
ActionListener<SendSnapshotResult> listener) throws IOException {
|
||||
phase2Called.set(true);
|
||||
super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
|
||||
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener);
|
||||
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, listener);
|
||||
}
|
||||
|
||||
};
|
||||
|
@ -716,8 +717,13 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu,
|
||||
ActionListener<Long> listener) {
|
||||
public void indexTranslogOperations(
|
||||
final List<Translog.Operation> operations,
|
||||
final int totalTranslogOps,
|
||||
final long timestamp,
|
||||
final long msu,
|
||||
final RetentionLeases retentionLeases,
|
||||
final ActionListener<Long> listener) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue