Do not mutate RecoveryResponse (#37204)

Today we create a global instance of RecoveryResponse then mutate it
when executing each recovery step. This is okay for the current
sequential recovery flow but  not suitable for an asynchronous recovery
which we are targeting. With this commit, we return the result of each
step separately, then construct a RecoveryResponse at the end.

Relates #37174
This commit is contained in:
Nhat Nguyen 2019-01-08 16:12:18 -05:00 committed by GitHub
parent 86b71930f6
commit 87ac3103f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 170 additions and 185 deletions

View File

@ -304,9 +304,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public RecoveryResponse read(StreamInput in) throws IOException {
RecoveryResponse recoveryResponse = new RecoveryResponse();
recoveryResponse.readFrom(in);
return recoveryResponse;
return new RecoveryResponse(in);
}
})
);

View File

@ -24,53 +24,46 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
class RecoveryResponse extends TransportResponse {
final class RecoveryResponse extends TransportResponse {
List<String> phase1FileNames = new ArrayList<>();
List<Long> phase1FileSizes = new ArrayList<>();
List<String> phase1ExistingFileNames = new ArrayList<>();
List<Long> phase1ExistingFileSizes = new ArrayList<>();
long phase1TotalSize;
long phase1ExistingTotalSize;
long phase1Time;
long phase1ThrottlingWaitTime;
final List<String> phase1FileNames;
final List<Long> phase1FileSizes;
final List<String> phase1ExistingFileNames;
final List<Long> phase1ExistingFileSizes;
final long phase1TotalSize;
final long phase1ExistingTotalSize;
final long phase1Time;
final long phase1ThrottlingWaitTime;
long startTime;
final long startTime;
int phase2Operations;
long phase2Time;
final int phase2Operations;
final long phase2Time;
RecoveryResponse() {
RecoveryResponse(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes, long phase1TotalSize, long phase1ExistingTotalSize,
long phase1Time, long phase1ThrottlingWaitTime, long startTime, int phase2Operations, long phase2Time) {
this.phase1FileNames = phase1FileNames;
this.phase1FileSizes = phase1FileSizes;
this.phase1ExistingFileNames = phase1ExistingFileNames;
this.phase1ExistingFileSizes = phase1ExistingFileSizes;
this.phase1TotalSize = phase1TotalSize;
this.phase1ExistingTotalSize = phase1ExistingTotalSize;
this.phase1Time = phase1Time;
this.phase1ThrottlingWaitTime = phase1ThrottlingWaitTime;
this.startTime = startTime;
this.phase2Operations = phase2Operations;
this.phase2Time = phase2Time;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
phase1FileNames = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
phase1FileNames.add(in.readString());
}
size = in.readVInt();
phase1FileSizes = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
phase1FileSizes.add(in.readVLong());
}
size = in.readVInt();
phase1ExistingFileNames = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
phase1ExistingFileNames.add(in.readString());
}
size = in.readVInt();
phase1ExistingFileSizes = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
phase1ExistingFileSizes.add(in.readVLong());
}
RecoveryResponse(StreamInput in) throws IOException {
super(in);
phase1FileNames = in.readList(StreamInput::readString);
phase1FileSizes = in.readList(StreamInput::readVLong);
phase1ExistingFileNames = in.readList(StreamInput::readString);
phase1ExistingFileSizes = in.readList(StreamInput::readVLong);
phase1TotalSize = in.readVLong();
phase1ExistingTotalSize = in.readVLong();
phase1Time = in.readVLong();
@ -83,24 +76,10 @@ class RecoveryResponse extends TransportResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(phase1FileNames.size());
for (String name : phase1FileNames) {
out.writeString(name);
}
out.writeVInt(phase1FileSizes.size());
for (long size : phase1FileSizes) {
out.writeVLong(size);
}
out.writeVInt(phase1ExistingFileNames.size());
for (String name : phase1ExistingFileNames) {
out.writeString(name);
}
out.writeVInt(phase1ExistingFileSizes.size());
for (long size : phase1ExistingFileSizes) {
out.writeVLong(size);
}
out.writeStringList(phase1FileNames);
out.writeCollection(phase1FileSizes, StreamOutput::writeVLong);
out.writeStringList(phase1ExistingFileNames);
out.writeCollection(phase1ExistingFileSizes, StreamOutput::writeVLong);
out.writeVLong(phase1TotalSize);
out.writeVLong(phase1ExistingTotalSize);
out.writeVLong(phase1Time);

View File

@ -41,6 +41,7 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.core.internal.io.IOUtils;
@ -64,6 +65,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
@ -95,8 +97,6 @@ public class RecoverySourceHandler {
private final int chunkSizeInBytes;
private final RecoveryTargetHandler recoveryTarget;
protected final RecoveryResponse response;
private final CancellableThreads cancellableThreads = new CancellableThreads() {
@Override
protected void onCancel(String reason, @Nullable Exception suppressedException) {
@ -122,7 +122,6 @@ public class RecoverySourceHandler {
this.shardId = this.request.shardId().id();
this.logger = Loggers.getLogger(getClass(), request.shardId(), "recover to " + request.targetNode().getName());
this.chunkSizeInBytes = fileChunkSizeInBytes;
this.response = new RecoveryResponse();
}
public StartRecoveryRequest getRequest() {
@ -149,10 +148,12 @@ public class RecoverySourceHandler {
final long requiredSeqNoRangeStart;
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo());
final SendFileResult sendFileResult;
if (isSequenceNumberBasedRecovery) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
requiredSeqNoRangeStart = startingSeqNo;
sendFileResult = SendFileResult.EMPTY;
} else {
final Engine.IndexCommitRef phase1Snapshot;
try {
@ -169,7 +170,7 @@ public class RecoverySourceHandler {
startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0;
try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
@ -184,9 +185,10 @@ public class RecoverySourceHandler {
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
+ startingSeqNo + "]";
final TimeValue prepareEngineTime;
try {
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
prepareEngineTime = prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
@ -213,21 +215,25 @@ public class RecoverySourceHandler {
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
}
final long targetLocalCheckpoint;
final SendSnapshotResult sendSnapshotResult;
try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) {
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
sendSnapshotResult = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
finalizeRecovery(targetLocalCheckpoint);
finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint);
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
return new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, prepareEngineTime.millis(),
sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
}
return response;
}
private boolean isTargetSameHistory() {
@ -276,6 +282,32 @@ public class RecoverySourceHandler {
});
}
static final class SendFileResult {
final List<String> phase1FileNames;
final List<Long> phase1FileSizes;
final long totalSize;
final List<String> phase1ExistingFileNames;
final List<Long> phase1ExistingFileSizes;
final long existingTotalSize;
final TimeValue took;
SendFileResult(List<String> phase1FileNames, List<Long> phase1FileSizes, long totalSize,
List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes, long existingTotalSize, TimeValue took) {
this.phase1FileNames = phase1FileNames;
this.phase1FileSizes = phase1FileSizes;
this.totalSize = totalSize;
this.phase1ExistingFileNames = phase1ExistingFileNames;
this.phase1ExistingFileSizes = phase1ExistingFileSizes;
this.existingTotalSize = existingTotalSize;
this.took = took;
}
static final SendFileResult EMPTY = new SendFileResult(Collections.emptyList(), Collections.emptyList(), 0L,
Collections.emptyList(), Collections.emptyList(), 0L, TimeValue.ZERO);
}
/**
* Perform phase1 of the recovery operations. Once this {@link IndexCommit}
* snapshot has been performed no commit operations (files being fsync'd)
@ -285,12 +317,16 @@ public class RecoverySourceHandler {
* segments that are missing. Only segments that have the same size and
* checksum can be reused
*/
public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) {
public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) {
cancellableThreads.checkForCancel();
// Total size of segment files that are recovered
long totalSize = 0;
// Total size of segment files that were able to be re-used
long existingTotalSize = 0;
final List<String> phase1FileNames = new ArrayList<>();
final List<Long> phase1FileSizes = new ArrayList<>();
final List<String> phase1ExistingFileNames = new ArrayList<>();
final List<Long> phase1ExistingFileSizes = new ArrayList<>();
final Store store = shard.store();
store.incRef();
try {
@ -331,8 +367,8 @@ public class RecoverySourceHandler {
} else {
final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());
for (StoreFileMetaData md : diff.identical) {
response.phase1ExistingFileNames.add(md.name());
response.phase1ExistingFileSizes.add(md.length());
phase1ExistingFileNames.add(md.name());
phase1ExistingFileSizes.add(md.length());
existingTotalSize += md.length();
if (logger.isTraceEnabled()) {
logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," +
@ -350,20 +386,16 @@ public class RecoverySourceHandler {
} else {
logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name());
}
response.phase1FileNames.add(md.name());
response.phase1FileSizes.add(md.length());
phase1FileNames.add(md.name());
phase1FileSizes.add(md.length());
totalSize += md.length();
}
response.phase1TotalSize = totalSize;
response.phase1ExistingTotalSize = existingTotalSize;
logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
response.phase1FileNames.size(),
new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
cancellableThreads.execute(() ->
recoveryTarget.receiveFileInfo(response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames,
response.phase1ExistingFileSizes, translogOps.get()));
phase1FileNames.size(), new ByteSizeValue(totalSize),
phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get()));
// How many bytes we've copied since we last called RateLimiter.pause
final Function<StoreFileMetaData, OutputStream> outputStreamFactories =
md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogOps), chunkSizeInBytes);
@ -417,27 +449,27 @@ public class RecoverySourceHandler {
}
}
}
logger.trace("recovery [phase1]: took [{}]", stopWatch.totalTime());
response.phase1Time = stopWatch.totalTime().millis();
final TimeValue took = stopWatch.totalTime();
logger.trace("recovery [phase1]: took [{}]", took);
return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
phase1ExistingFileSizes, existingTotalSize, took);
} catch (Exception e) {
throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e);
throw new RecoverFilesRecoveryException(request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSize), e);
} finally {
store.decRef();
}
}
void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException {
TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException {
StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase1]: prepare remote engine for translog");
final long startEngineStart = stopWatch.totalTime().millis();
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps));
stopWatch.stop();
response.startTime = stopWatch.totalTime().millis() - startEngineStart;
logger.trace("recovery [phase1]: remote engine start took [{}]", stopWatch.totalTime());
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime);
return tookTime;
}
/**
@ -454,102 +486,23 @@ public class RecoverySourceHandler {
* @param snapshot a snapshot of the translog
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.
* @return the local checkpoint on the target
* @return the send snapshot result
*/
long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot,
final long maxSeenAutoIdTimestamp, final long maxSeqNoOfUpdatesOrDeletes)
throws IOException {
SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
"startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart;
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
final StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " +
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");
// send all the snapshot's translog operations to the target
final SendSnapshotResult result = sendSnapshot(
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
stopWatch.stop();
logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime());
response.phase2Time = stopWatch.totalTime().millis();
response.phase2Operations = result.totalOperations;
return result.targetLocalCheckpoint;
}
/*
* finalizes the recovery process
*/
public void finalizeRecovery(final long targetLocalCheckpoint) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
StopWatch stopWatch = new StopWatch().start();
logger.trace("finalizing recovery");
/*
* Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a
* shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done
* marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire
* the permit then the state of the shard will be relocated and this recovery will fail.
*/
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
final long globalCheckpoint = shard.getGlobalCheckpoint();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);
if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext));
/*
* if the recovery process fails after disabling primary mode on the source shard, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
}
stopWatch.stop();
logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
}
static class SendSnapshotResult {
final long targetLocalCheckpoint;
final int totalOperations;
SendSnapshotResult(final long targetLocalCheckpoint, final int totalOperations) {
this.targetLocalCheckpoint = targetLocalCheckpoint;
this.totalOperations = totalOperations;
}
}
/**
* Send the given snapshot's operations with a sequence number greater than the specified staring sequence number to this handler's
* target node.
* <p>
* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit.
*
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range
* @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive)
* @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the
* total number of operations sent
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
final Translog.Snapshot snapshot, final long maxSeenAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
"startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart;
int ops = 0;
long size = 0;
int skippedOps = 0;
@ -615,7 +568,58 @@ public class RecoverySourceHandler {
logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);
return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps);
stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase2]: took [{}]", tookTime);
return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps, tookTime);
}
/*
* finalizes the recovery process
*/
public void finalizeRecovery(final long targetLocalCheckpoint) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
StopWatch stopWatch = new StopWatch().start();
logger.trace("finalizing recovery");
/*
* Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a
* shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done
* marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire
* the permit then the state of the shard will be relocated and this recovery will fail.
*/
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
final long globalCheckpoint = shard.getGlobalCheckpoint();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);
if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext));
/*
* if the recovery process fails after disabling primary mode on the source shard, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
}
stopWatch.stop();
logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
}
static final class SendSnapshotResult {
final long targetLocalCheckpoint;
final int totalOperations;
final TimeValue tookTime;
SendSnapshotResult(final long targetLocalCheckpoint, final int totalOperations, final TimeValue tookTime) {
this.targetLocalCheckpoint = targetLocalCheckpoint;
this.totalOperations = totalOperations;
this.tookTime = tookTime;
}
}
/**

View File

@ -46,6 +46,7 @@ import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
@ -190,7 +191,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1);
final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1);
final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1);
RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo,
RecoverySourceHandler.SendSnapshotResult result = handler.phase2(startingSeqNo, requiredStartingSeqNo,
endingSeqNo, new Translog.Snapshot() {
@Override
public void close() {
@ -229,7 +230,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
.filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList());
List<Translog.Operation> opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps);
expectThrows(IllegalStateException.class, () ->
handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo,
handler.phase2(startingSeqNo, requiredStartingSeqNo,
endingSeqNo, new Translog.Snapshot() {
@Override
public void close() {
@ -412,20 +413,23 @@ public class RecoverySourceHandlerTests extends ESTestCase {
recoverySettings.getChunkSize().bytesAsInt()) {
@Override
public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) {
public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) {
phase1Called.set(true);
return super.phase1(snapshot, translogOps);
}
@Override
void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException {
TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException {
prepareTargetForTranslogCalled.set(true);
return super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps);
}
@Override
long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) {
SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException {
phase2Called.set(true);
return SequenceNumbers.UNASSIGNED_SEQ_NO;
return super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
}
};