From 55b3ec8d7b5020770be9ee02545933313c174680 Mon Sep 17 00:00:00 2001
From: Nhat Nguyen <nhat.nguyen@elastic.co>
Date: Sat, 29 Jun 2019 18:26:08 -0400
Subject: [PATCH] Make peer recovery clean files step async (#43787)

Relates #36195
---
 .../recovery/PeerRecoveryTargetService.java   |   8 +-
 .../recovery/RecoverySourceHandler.java       | 194 +++++++++---------
 .../indices/recovery/RecoveryTarget.java      | 100 ++++-----
 .../recovery/RecoveryTargetHandler.java       |   3 +-
 .../recovery/RemoteRecoveryTargetHandler.java |   6 +-
 .../IndexLevelReplicationTests.java           |  17 +-
 .../RecoveryDuringReplicationTests.java       |   5 +-
 .../PeerRecoveryTargetServiceTests.java       |   6 +-
 .../recovery/RecoverySourceHandlerTests.java  |   8 +-
 .../indices/recovery/RecoveryTests.java       |   6 +-
 .../indices/recovery/AsyncRecoveryTarget.java |   6 +-
 11 files changed, 184 insertions(+), 175 deletions(-)

diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
index 6b1a893667f..50abeb2fb7a 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
@@ -544,10 +544,10 @@ public class PeerRecoveryTargetService implements IndexEventListener {
 
         @Override
         public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
-            try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
-            )) {
-                recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot());
-                channel.sendResponse(TransportResponse.Empty.INSTANCE);
+            try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
+                final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.CLEAN_FILES, request);
+                recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(),
+                    ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
             }
         }
     }
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
index f3e10c13c21..959c13ccb89 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
@@ -40,6 +40,7 @@ import org.elasticsearch.common.StopWatch;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
 import org.elasticsearch.common.unit.ByteSizeValue;
@@ -75,7 +76,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
+import java.util.function.IntSupplier;
 import java.util.stream.StreamSupport;
 
 import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
@@ -160,15 +161,21 @@ public class RecoverySourceHandler {
             final long startingSeqNo;
             final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
                 isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo());
-            final SendFileResult sendFileResult;
+
+            final StepListener<SendFileResult> sendFileStep = new StepListener<>();
+            final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
+            final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
+            final StepListener<Void> finalizeStep = new StepListener<>();
+
             if (isSequenceNumberBasedRecovery) {
                 logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
                 startingSeqNo = request.startingSeqNo();
-                sendFileResult = SendFileResult.EMPTY;
+                sendFileStep.onResponse(SendFileResult.EMPTY);
             } else {
-                final Engine.IndexCommitRef phase1Snapshot;
+                final Engine.IndexCommitRef safeCommitRef;
                 try {
-                    phase1Snapshot = shard.acquireSafeIndexCommit();
+                    safeCommitRef = shard.acquireSafeIndexCommit();
+                    resources.add(safeCommitRef);
                 } catch (final Exception e) {
                     throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
                 }
@@ -177,24 +184,29 @@ public class RecoverySourceHandler {
                 startingSeqNo = 0;
                 try {
                     final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
-                    sendFileResult = phase1(phase1Snapshot.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps);
+                    shard.store().incRef();
+                    final Releasable releaseStore = Releasables.releaseOnce(shard.store()::decRef);
+                    resources.add(releaseStore);
+                    sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
+                        try {
+                            IOUtils.close(safeCommitRef, releaseStore);
+                        } catch (final IOException ex) {
+                            logger.warn("releasing snapshot caused exception", ex);
+                        }
+                    });
+                    phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep);
                 } catch (final Exception e) {
-                    throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
-                } finally {
-                    try {
-                        IOUtils.close(phase1Snapshot);
-                    } catch (final IOException ex) {
-                        logger.warn("releasing snapshot caused exception", ex);
-                    }
+                    throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);
                 }
             }
             assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;
 
-            final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
-            // For a sequence based recovery, the target can keep its local translog
-            prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
-                shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
-            final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
+            sendFileStep.whenComplete(r -> {
+                // For a sequence based recovery, the target can keep its local translog
+                prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
+                    shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
+            }, onFailure);
+
             prepareEngineStep.whenComplete(prepareEngineTime -> {
                 /*
                  * add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
@@ -231,12 +243,12 @@ public class RecoverySourceHandler {
 
             }, onFailure);
 
-            final StepListener<Void> finalizeStep = new StepListener<>();
             sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);
 
             finalizeStep.whenComplete(r -> {
                 final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
                 final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();
+                final SendFileResult sendFileResult = sendFileStep.result();
                 final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
                     sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
                     sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
@@ -333,18 +345,17 @@ public class RecoverySourceHandler {
      * segments that are missing. Only segments that have the same size and
      * checksum can be reused
      */
-    public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier<Integer> translogOps) {
+    void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
         cancellableThreads.checkForCancel();
         // Total size of segment files that are recovered
-        long totalSize = 0;
+        long totalSizeInBytes = 0;
         // Total size of segment files that were able to be re-used
-        long existingTotalSize = 0;
+        long existingTotalSizeInBytes = 0;
         final List<String> phase1FileNames = new ArrayList<>();
         final List<Long> phase1FileSizes = new ArrayList<>();
         final List<String> phase1ExistingFileNames = new ArrayList<>();
         final List<Long> phase1ExistingFileSizes = new ArrayList<>();
         final Store store = shard.store();
-        store.incRef();
         try {
             StopWatch stopWatch = new StopWatch().start();
             final Store.MetadataSnapshot recoverySourceMetadata;
@@ -370,12 +381,12 @@ public class RecoverySourceHandler {
                 for (StoreFileMetaData md : diff.identical) {
                     phase1ExistingFileNames.add(md.name());
                     phase1ExistingFileSizes.add(md.length());
-                    existingTotalSize += md.length();
+                    existingTotalSizeInBytes += md.length();
                     if (logger.isTraceEnabled()) {
                         logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," +
                                         " size [{}]", md.name(), md.checksum(), md.length());
                     }
-                    totalSize += md.length();
+                    totalSizeInBytes += md.length();
                 }
                 List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
                 phase1Files.addAll(diff.different);
@@ -389,75 +400,33 @@ public class RecoverySourceHandler {
                     }
                     phase1FileNames.add(md.name());
                     phase1FileSizes.add(md.length());
-                    totalSize += md.length();
+                    totalSizeInBytes += md.length();
                 }
 
                 logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
-                    phase1FileNames.size(), new ByteSizeValue(totalSize),
-                    phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
+                    phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes),
+                    phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes));
                 cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(
-                    phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get()));
+                    phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.getAsInt()));
                 sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
-                // Send the CLEAN_FILES request, which takes all of the files that
-                // were transferred and renames them from their temporary file
-                // names to the actual file names. It also writes checksums for
-                // the files after they have been renamed.
-                //
-                // Once the files have been renamed, any other files that are not
-                // related to this recovery (out of date segments, for example)
-                // are deleted
-                try {
-                    cancellableThreads.executeIO(() ->
-                        recoveryTarget.cleanFiles(translogOps.get(), globalCheckpoint, recoverySourceMetadata));
-                } catch (RemoteTransportException | IOException targetException) {
-                    final IOException corruptIndexException;
-                    // we realized that after the index was copied and we wanted to finalize the recovery
-                    // the index was corrupted:
-                    //   - maybe due to a broken segments file on an empty index (transferred with no checksum)
-                    //   - maybe due to old segments without checksums or length only checks
-                    if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(targetException)) != null) {
-                        try {
-                            final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot);
-                            StoreFileMetaData[] metadata =
-                                    StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(StoreFileMetaData[]::new);
-                            ArrayUtil.timSort(metadata, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first
-                            for (StoreFileMetaData md : metadata) {
-                                cancellableThreads.checkForCancel();
-                                logger.debug("checking integrity for file {} after remove corruption exception", md);
-                                if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
-                                    shard.failShard("recovery", corruptIndexException);
-                                    logger.warn("Corrupted file detected {} checksum mismatch", md);
-                                    throw corruptIndexException;
-                                }
-                            }
-                        } catch (IOException ex) {
-                            targetException.addSuppressed(ex);
-                            throw targetException;
-                        }
-                        // corruption has happened on the way to replica
-                        RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but " +
-                                "checksums are ok", null);
-                        exception.addSuppressed(targetException);
-                        logger.warn(() -> new ParameterizedMessage(
-                                "{} Remote file corruption during finalization of recovery on node {}. local checksum OK",
-                                shard.shardId(), request.targetNode()), corruptIndexException);
-                        throw exception;
-                    } else {
-                        throw targetException;
-                    }
-                }
+                final long totalSize = totalSizeInBytes;
+                final long existingTotalSize = existingTotalSizeInBytes;
+                cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, ActionListener.map(listener, aVoid -> {
+                    final TimeValue took = stopWatch.totalTime();
+                    logger.trace("recovery [phase1]: took [{}]", took);
+                    return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
+                        phase1ExistingFileSizes, existingTotalSize, took);
+                }));
             } else {
                 logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target",
                     recoverySourceMetadata.getSyncId());
+                final TimeValue took = stopWatch.totalTime();
+                logger.trace("recovery [phase1]: took [{}]", took);
+                listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSizeInBytes, phase1ExistingFileNames,
+                    phase1ExistingFileSizes, existingTotalSizeInBytes, took));
             }
-            final TimeValue took = stopWatch.totalTime();
-            logger.trace("recovery [phase1]: took [{}]", took);
-            return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
-                phase1ExistingFileSizes, existingTotalSize, took);
         } catch (Exception e) {
-            throw new RecoverFilesRecoveryException(request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSize), e);
-        } finally {
-            store.decRef();
+            throw new RecoverFilesRecoveryException(request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes), e);
         }
     }
 
@@ -695,7 +664,7 @@ public class RecoverySourceHandler {
                 '}';
     }
 
-    void sendFiles(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps) throws Exception {
+    void sendFiles(Store store, StoreFileMetaData[] files, IntSupplier translogOps) throws Exception {
         ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
         final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
         final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
@@ -720,7 +689,7 @@ public class RecoverySourceHandler {
                     }
                     final long requestFilePosition = position;
                     cancellableThreads.executeIO(() ->
-                        recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.get(),
+                        recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.getAsInt(),
                             ActionListener.wrap(
                                 r -> requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId),
                                 e -> {
@@ -741,24 +710,53 @@ public class RecoverySourceHandler {
             cancellableThreads.execute(() -> requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqIdTracker.getMaxSeqNo()));
         }
         if (error.get() != null) {
-            handleErrorOnSendFiles(store, error.get().v1(), error.get().v2());
+            handleErrorOnSendFiles(store, error.get().v2(), new StoreFileMetaData[]{error.get().v1()});
         }
     }
 
-    private void handleErrorOnSendFiles(Store store, StoreFileMetaData md, Exception e) throws Exception {
-        final IOException corruptIndexException;
-        if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
-            if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
-                logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
-                failEngine(corruptIndexException);
-                throw corruptIndexException;
+    private void cleanFiles(Store store, Store.MetadataSnapshot sourceMetadata, IntSupplier translogOps,
+                            long globalCheckpoint, ActionListener<Void> listener) {
+        // Send the CLEAN_FILES request, which takes all of the files that
+        // were transferred and renames them from their temporary file
+        // names to the actual file names. It also writes checksums for
+        // the files after they have been renamed.
+        //
+        // Once the files have been renamed, any other files that are not
+        // related to this recovery (out of date segments, for example)
+        // are deleted
+        cancellableThreads.execute(() -> recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata,
+            ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> {
+                StoreFileMetaData[] mds = StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetaData[]::new);
+                ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first
+                handleErrorOnSendFiles(store, e, mds);
+                throw e;
+            }))));
+    }
+
+    private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetaData[] mds) throws Exception {
+        final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e);
+        if (corruptIndexException != null) {
+            Exception localException = null;
+            for (StoreFileMetaData md : mds) {
+                cancellableThreads.checkForCancel();
+                logger.debug("checking integrity for file {} after remove corruption exception", md);
+                if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
+                    logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
+                    if (localException == null) {
+                        localException = corruptIndexException;
+                    }
+                    failEngine(corruptIndexException);
+                }
+            }
+            if (localException != null) {
+                throw localException;
             } else { // corruption has happened on the way to replica
-                RemoteTransportException exception = new RemoteTransportException(
+                RemoteTransportException remoteException = new RemoteTransportException(
                     "File corruption occurred on recovery but checksums are ok", null);
-                exception.addSuppressed(e);
+                remoteException.addSuppressed(e);
                 logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK",
-                    shardId, request.targetNode(), md), corruptIndexException);
-                throw exception;
+                    shardId, request.targetNode(), mds), corruptIndexException);
+                throw remoteException;
             }
         } else {
             throw e;
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
index bbd0cea04af..55aa5b22595 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
@@ -392,57 +392,61 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
     }
 
     @Override
-    public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
-        state().getTranslog().totalOperations(totalTranslogOps);
-        // first, we go and move files that were created with the recovery id suffix to
-        // the actual names, its ok if we have a corrupted index here, since we have replicas
-        // to recover from in case of a full cluster shutdown just when this code executes...
-        multiFileWriter.renameAllTempFiles();
-        final Store store = store();
-        store.incRef();
-        try {
-            store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
-            if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
-                store.ensureIndexHasHistoryUUID();
-            }
-            final String translogUUID = Translog.createEmptyTranslog(
-                indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
-            store.associateIndexWithNewTranslog(translogUUID);
-
-            if (indexShard.getRetentionLeases().leases().isEmpty()) {
-                // if empty, may be a fresh IndexShard, so write an empty leases file to disk
-                indexShard.persistRetentionLeases();
-                assert indexShard.loadRetentionLeases().leases().isEmpty();
-            } else {
-                assert indexShard.assertRetentionLeasesPersisted();
-            }
-
-        } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
-            // this is a fatal exception at this stage.
-            // this means we transferred files from the remote that have not be checksummed and they are
-            // broken. We have to clean up this shard entirely, remove all files and bubble it up to the
-            // source shard since this index might be broken there as well? The Source can handle this and checks
-            // its content on disk if possible.
+    public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
+                           ActionListener<Void> listener) {
+        ActionListener.completeWith(listener, () -> {
+            state().getTranslog().totalOperations(totalTranslogOps);
+            // first, we go and move files that were created with the recovery id suffix to
+            // the actual names, its ok if we have a corrupted index here, since we have replicas
+            // to recover from in case of a full cluster shutdown just when this code executes...
+            multiFileWriter.renameAllTempFiles();
+            final Store store = store();
+            store.incRef();
             try {
-                try {
-                    store.removeCorruptionMarker();
-                } finally {
-                    Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files
+                store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
+                if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
+                    store.ensureIndexHasHistoryUUID();
                 }
-            } catch (Exception e) {
-                logger.debug("Failed to clean lucene index", e);
-                ex.addSuppressed(e);
+                final String translogUUID = Translog.createEmptyTranslog(
+                    indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
+                store.associateIndexWithNewTranslog(translogUUID);
+
+                if (indexShard.getRetentionLeases().leases().isEmpty()) {
+                    // if empty, may be a fresh IndexShard, so write an empty leases file to disk
+                    indexShard.persistRetentionLeases();
+                    assert indexShard.loadRetentionLeases().leases().isEmpty();
+                } else {
+                    assert indexShard.assertRetentionLeasesPersisted();
+                }
+
+            } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
+                // this is a fatal exception at this stage.
+                // this means we transferred files from the remote that have not be checksummed and they are
+                // broken. We have to clean up this shard entirely, remove all files and bubble it up to the
+                // source shard since this index might be broken there as well? The Source can handle this and checks
+                // its content on disk if possible.
+                try {
+                    try {
+                        store.removeCorruptionMarker();
+                    } finally {
+                        Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files
+                    }
+                } catch (Exception e) {
+                    logger.debug("Failed to clean lucene index", e);
+                    ex.addSuppressed(e);
+                }
+                RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
+                fail(rfe, true);
+                throw rfe;
+            } catch (Exception ex) {
+                RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
+                fail(rfe, true);
+                throw rfe;
+            } finally {
+                store.decRef();
             }
-            RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
-            fail(rfe, true);
-            throw rfe;
-        } catch (Exception ex) {
-            RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
-            fail(rfe, true);
-            throw rfe;
-        } finally {
-            store.decRef();
-        }
+            return null;
+        });
     }
 
     @Override
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
index d03fe42d901..89f4cb22c2b 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
@@ -26,7 +26,6 @@ import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.store.StoreFileMetaData;
 import org.elasticsearch.index.translog.Translog;
 
-import java.io.IOException;
 import java.util.List;
 
 public interface RecoveryTargetHandler {
@@ -99,7 +98,7 @@ public interface RecoveryTargetHandler {
      * @param globalCheckpoint the global checkpoint on the primary
      * @param sourceMetaData   meta data of the source store
      */
-    void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException;
+    void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData, ActionListener<Void> listener);
 
     /** writes a partial file chunk to the target store */
     void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
index ec3c22d42a1..6b786fdae4d 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
@@ -140,11 +140,13 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
     }
 
     @Override
-    public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
+    public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
+                           ActionListener<Void> listener) {
         transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.CLEAN_FILES,
                 new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetaData, totalTranslogOps, globalCheckpoint),
                 TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
-                EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
+                new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
+                    in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
     }
 
     @Override
diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java
index e25557eaabc..97e5210c9d0 100644
--- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java
+++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java
@@ -122,14 +122,15 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
                 (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
                     @Override
                     public void cleanFiles(int totalTranslogOps, long globalCheckpoint,
-                                           Store.MetadataSnapshot sourceMetaData) throws IOException {
-                        super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
-                        latch.countDown();
-                        try {
-                            latch.await();
-                        } catch (InterruptedException e) {
-                            throw new AssertionError(e);
-                        }
+                                           Store.MetadataSnapshot sourceMetaData, ActionListener<Void> listener) {
+                        super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, ActionListener.runAfter(listener, () -> {
+                            latch.countDown();
+                            try {
+                                latch.await();
+                            } catch (InterruptedException e) {
+                                throw new AssertionError(e);
+                            }
+                        }));
                     }
                 });
             future.get();
diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
index c94c289f51f..c60f32132c6 100644
--- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
+++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
@@ -848,9 +848,10 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
         }
 
         @Override
-        public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
+        public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
+                               ActionListener<Void> listener) {
             blockIfNeeded(RecoveryState.Stage.INDEX);
-            super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
+            super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, listener);
         }
 
         @Override
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
index bb4c25e6186..5a6d7fbaa17 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
@@ -28,6 +28,7 @@ import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.UUIDs;
@@ -189,7 +190,10 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
         for (Thread sender : senders) {
             sender.join();
         }
-        recoveryTarget.cleanFiles(0, Long.parseLong(sourceSnapshot.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)), sourceSnapshot);
+        PlainActionFuture<Void> cleanFilesFuture = new PlainActionFuture<>();
+        recoveryTarget.cleanFiles(0, Long.parseLong(sourceSnapshot.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)),
+            sourceSnapshot, cleanFilesFuture);
+        cleanFilesFuture.actionGet();
         recoveryTarget.decRef();
         Store.MetadataSnapshot targetSnapshot = targetShard.snapshotStoreMetadata();
         Store.RecoveryDiff diff = sourceSnapshot.recoveryDiff(targetSnapshot);
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
index b00e89575cc..b69033ba9b4 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
@@ -98,7 +98,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.IntSupplier;
-import java.util.function.Supplier;
 import java.util.zip.CRC32;
 
 import static java.util.Collections.emptyMap;
@@ -478,9 +477,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
                 between(1, 8)) {
 
             @Override
-            public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier<Integer> translogOps) {
+            void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
                 phase1Called.set(true);
-                return super.phase1(snapshot, globalCheckpoint, translogOps);
+                super.phase1(snapshot, globalCheckpoint, translogOps, listener);
             }
 
             @Override
@@ -758,7 +757,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         }
 
         @Override
-        public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) {
+        public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
+                               ActionListener<Void> listener) {
         }
 
         @Override
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
index c3f6a3aae89..28e84c1210a 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
@@ -47,7 +47,6 @@ import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.translog.SnapshotMatchers;
 import org.elasticsearch.index.translog.Translog;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -335,9 +334,10 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
                 assertThat(replicaShard.getLastKnownGlobalCheckpoint(), equalTo(primaryShard.getLastKnownGlobalCheckpoint()));
             }
             @Override
-            public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
+            public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
+                                   ActionListener<Void> listener) {
                 assertThat(globalCheckpoint, equalTo(primaryShard.getLastKnownGlobalCheckpoint()));
-                super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
+                super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, listener);
             }
         }, true, true);
         List<IndexCommit> commits = DirectoryReader.listCommits(replicaShard.store().directory());
diff --git a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java
index d5a7ab8109e..cf2b768f46d 100644
--- a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java
+++ b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java
@@ -29,7 +29,6 @@ import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.store.StoreFileMetaData;
 import org.elasticsearch.index.translog.Translog;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -75,8 +74,9 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler {
     }
 
     @Override
-    public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
-        target.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
+    public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
+                           ActionListener<Void> listener) {
+        executor.execute(() -> target.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, listener));
     }
 
     @Override