Do not send recovery requests with CancellableThreads (#46287)

Previously, we send recovery requests using CancellableThreads because
we send requests and wait for responses in a blocking manner. With async
recovery, we no longer need to do so. Moreover, if we fail to submit a
request, then we can release the Store using an interruptible thread
which can risk invalidating the node lock.

This PR is the first step to avoid forking when releasing the Store.

Relates #45409
Relates #46178
This commit is contained in:
Nhat Nguyen 2019-09-04 11:01:01 -04:00
parent 1a29711b06
commit eb56d23421

View File

@ -520,9 +520,9 @@ public class RecoverySourceHandler {
final StepListener<Void> sendFilesStep = new StepListener<>();
final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();
final StepListener<Void> cleanFilesStep = new StepListener<>();
cancellableThreads.execute(() ->
recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep));
cancellableThreads.checkForCancel();
recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep);
sendFileInfoStep.whenComplete(r ->
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure);
@ -637,8 +637,8 @@ public class RecoverySourceHandler {
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
logger.trace("recovery [phase1]: prepare remote engine for translog");
cancellableThreads.execute(() ->
recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener));
cancellableThreads.checkForCancel();
recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener);
}
/**
@ -744,30 +744,29 @@ public class RecoverySourceHandler {
final List<Translog.Operation> operations = nextBatch.get();
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
if (operations.isEmpty() == false || firstBatch) {
cancellableThreads.execute(() -> {
recoveryTarget.indexTranslogOperations(
operations,
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
ActionListener.wrap(
newCheckpoint -> {
sendBatch(
nextBatch,
false,
SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
listener);
},
listener::onFailure
));
});
cancellableThreads.checkForCancel();
recoveryTarget.indexTranslogOperations(
operations,
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
ActionListener.wrap(
newCheckpoint -> {
sendBatch(
nextBatch,
false,
SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
listener);
},
listener::onFailure
));
} else {
listener.onResponse(targetLocalCheckpoint);
}
@ -790,7 +789,8 @@ public class RecoverySourceHandler {
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery
final StepListener<Void> finalizeListener = new StepListener<>();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener));
cancellableThreads.checkForCancel();
recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener);
finalizeListener.whenComplete(r -> {
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);
@ -897,8 +897,9 @@ public class RecoverySourceHandler {
@Override
protected void sendChunkRequest(FileChunk request, ActionListener<Void> listener) {
cancellableThreads.execute(() -> recoveryTarget.writeFileChunk(
request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener));
cancellableThreads.checkForCancel();
recoveryTarget.writeFileChunk(
request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener);
}
@Override
@ -925,13 +926,14 @@ public class RecoverySourceHandler {
// Once the files have been renamed, any other files that are not
// related to this recovery (out of date segments, for example)
// are deleted
cancellableThreads.execute(() -> recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata,
cancellableThreads.checkForCancel();
recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata,
ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> {
StoreFileMetaData[] mds = StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetaData[]::new);
ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first
handleErrorOnSendFiles(store, e, mds);
throw e;
}))));
})));
}
private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetaData[] mds) throws Exception {