diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 3a48702fd55..bc2196501fb 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -431,13 +431,13 @@ public class PeerRecoveryTargetService implements IndexEventListener { class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler { @Override - public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, - Task task) throws Exception { - try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() - )) { - recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps()); + public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) { + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { + final ActionListener listener = + new HandledTransportAction.ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request); + recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(), + ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); } - channel.sendResponse(TransportResponse.Empty.INSTANCE); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 34434f50b45..f360a68b7a8 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -197,51 +197,51 @@ public class RecoverySourceHandler { assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" + startingSeqNo + "]"; - final TimeValue prepareEngineTime; - try { - // For a sequence based recovery, the target can keep its local translog - prepareEngineTime = prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, - shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); - } catch (final Exception e) { - throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); - } - - /* - * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. - * This means that any document indexed into the primary after this will be replicated to this replica as well - * make sure to do this before sampling the max sequence number in the next step, to ensure that we send - * all documents up to maxSeqNo in phase2. - */ - runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), - shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); - - final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - /* - * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all - * operations in the required range will be available for replaying from the translog of the source. - */ - cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); - - if (logger.isTraceEnabled()) { - logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); - logger.trace("snapshot translog for recovery; current size is [{}]", - shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); - } - - final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); - resources.add(phase2Snapshot); - // we can release the retention lock here because the snapshot itself will retain the required operations. - IOUtils.close(retentionLock); - // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values - // are at least as high as the corresponding values on the primary when any of these operations were executed on it. - final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); - final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); + final StepListener prepareEngineStep = new StepListener<>(); + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, + shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep); final StepListener sendSnapshotStep = new StepListener<>(); - phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep); - sendSnapshotStep.whenComplete( - r -> IOUtils.close(phase2Snapshot), - e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e))); + prepareEngineStep.whenComplete(prepareEngineTime -> { + /* + * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. + * This means that any document indexed into the primary after this will be replicated to this replica as well + * make sure to do this before sampling the max sequence number in the next step, to ensure that we send + * all documents up to maxSeqNo in phase2. + */ + runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), + shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); + + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + /* + * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all + * operations in the required range will be available for replaying from the translog of the source. + */ + cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); + if (logger.isTraceEnabled()) { + logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); + logger.trace("snapshot translog for recovery; current size is [{}]", + shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); + } + final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); + resources.add(phase2Snapshot); + // we can release the retention lock here because the snapshot itself will retain the required operations. + retentionLock.close(); + // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values + // are at least as high as the corresponding values on the primary when any of these operations were executed on it. + final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); + final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); + phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep); + sendSnapshotStep.whenComplete( + r -> IOUtils.close(phase2Snapshot), + e -> { + IOUtils.closeWhileHandlingException(phase2Snapshot); + onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)); + }); + + }, onFailure); + final StepListener finalizeStep = new StepListener<>(); sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure); @@ -251,7 +251,7 @@ public class RecoverySourceHandler { final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize, sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, - prepareEngineTime.millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()); + prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()); try { wrappedListener.onResponse(response); } finally { @@ -484,16 +484,21 @@ public class RecoverySourceHandler { } } - TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException { + void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { StopWatch stopWatch = new StopWatch().start(); - logger.trace("recovery [phase1]: prepare remote engine for translog"); + final ActionListener wrappedListener = ActionListener.wrap( + nullVal -> { + stopWatch.stop(); + final TimeValue tookTime = stopWatch.totalTime(); + logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime); + listener.onResponse(tookTime); + }, + e -> listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e))); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. - cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps)); - stopWatch.stop(); - final TimeValue tookTime = stopWatch.totalTime(); - logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime); - return tookTime; + logger.trace("recovery [phase1]: prepare remote engine for translog"); + cancellableThreads.execute(() -> + recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, wrappedListener)); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 5950f71006a..b3004905429 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -366,9 +366,12 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget /*** Implementation of {@link RecoveryTargetHandler } */ @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { - state().getTranslog().totalOperations(totalTranslogOps); - indexShard().openEngineAndSkipTranslogRecovery(); + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + state().getTranslog().totalOperations(totalTranslogOps); + indexShard().openEngineAndSkipTranslogRecovery(); + return null; + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 3372c933559..0fa5e94c63c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -35,7 +35,7 @@ public interface RecoveryTargetHandler { * @param fileBasedRecovery whether or not this call is part of an file based recovery * @param totalTranslogOps total translog operations expected to be sent */ - void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException; + void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener); /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index ba703aeee28..0799cc65951 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -77,11 +77,12 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { } @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, - new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery), - TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), + new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure), + in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 5fca37a7188..f67ce8de142 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkShardRequest; @@ -198,13 +199,14 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase Future fut = shards.asyncRecoverReplica(replica, (shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){ @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, + ActionListener listener) { try { indexedOnPrimary.await(); } catch (InterruptedException e) { throw new AssertionError(e); } - super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps); + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener); } }); fut.get(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 0d58239fd4a..1ea90ec6e8f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2573,8 +2573,8 @@ public class IndexShardTests extends IndexShardTestCase { }) { // we're only checking that listeners are called when the engine is open, before there is no point @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { - super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps); + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener); assertListenerCalled.accept(replica); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 0cecc925b24..cd495b7e17b 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -491,9 +491,9 @@ public class RecoverySourceHandlerTests extends ESTestCase { } @Override - TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException { + void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { prepareTargetForTranslogCalled.set(true); - return super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps); + super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps, listener); } @Override @@ -700,7 +700,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { class TestRecoveryTargetHandler implements RecoveryTargetHandler { @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { } @Override