diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 0b67d62575f..0c976cabac9 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -520,9 +520,9 @@ public class RecoverySourceHandler { final StepListener sendFilesStep = new StepListener<>(); final StepListener createRetentionLeaseStep = new StepListener<>(); final StepListener cleanFilesStep = new StepListener<>(); - cancellableThreads.execute(() -> - recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, - phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep)); + cancellableThreads.checkForCancel(); + recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, + phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep); sendFileInfoStep.whenComplete(r -> sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure); @@ -637,8 +637,8 @@ public class RecoverySourceHandler { // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. logger.trace("recovery [phase1]: prepare remote engine for translog"); - cancellableThreads.execute(() -> - recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener)); + cancellableThreads.checkForCancel(); + recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener); } /** @@ -744,30 +744,29 @@ public class RecoverySourceHandler { final List operations = nextBatch.get(); // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint if (operations.isEmpty() == false || firstBatch) { - cancellableThreads.execute(() -> { - recoveryTarget.indexTranslogOperations( - operations, - totalTranslogOps, - maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, - retentionLeases, - mappingVersionOnPrimary, - ActionListener.wrap( - newCheckpoint -> { - sendBatch( - nextBatch, - false, - SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint), - totalTranslogOps, - maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, - retentionLeases, - mappingVersionOnPrimary, - listener); - }, - listener::onFailure - )); - }); + cancellableThreads.checkForCancel(); + recoveryTarget.indexTranslogOperations( + operations, + totalTranslogOps, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + mappingVersionOnPrimary, + ActionListener.wrap( + newCheckpoint -> { + sendBatch( + nextBatch, + false, + SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint), + totalTranslogOps, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + mappingVersionOnPrimary, + listener); + }, + listener::onFailure + )); } else { listener.onResponse(targetLocalCheckpoint); } @@ -790,7 +789,8 @@ public class RecoverySourceHandler { shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger); final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery final StepListener finalizeListener = new StepListener<>(); - cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener)); + cancellableThreads.checkForCancel(); + recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener); finalizeListener.whenComplete(r -> { runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger); @@ -897,8 +897,9 @@ public class RecoverySourceHandler { @Override protected void sendChunkRequest(FileChunk request, ActionListener listener) { - cancellableThreads.execute(() -> recoveryTarget.writeFileChunk( - request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener)); + cancellableThreads.checkForCancel(); + recoveryTarget.writeFileChunk( + request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener); } @Override @@ -925,13 +926,14 @@ public class RecoverySourceHandler { // Once the files have been renamed, any other files that are not // related to this recovery (out of date segments, for example) // are deleted - cancellableThreads.execute(() -> recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata, + cancellableThreads.checkForCancel(); + recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata, ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> { StoreFileMetaData[] mds = StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetaData[]::new); ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first handleErrorOnSendFiles(store, e, mds); throw e; - })))); + }))); } private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetaData[] mds) throws Exception {