From 598e00a689056fcbdf72963a1d501248b610f08c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 30 Jun 2019 17:47:38 -0400 Subject: [PATCH] Make peer recovery send file info step async (#43792) Relates #36195 --- .../recovery/PeerRecoveryTargetService.java | 10 ++++---- .../recovery/RecoverySourceHandler.java | 22 ++++++++++------ .../indices/recovery/RecoveryTarget.java | 25 +++++++++++-------- .../recovery/RecoveryTargetHandler.java | 3 ++- .../recovery/RemoteRecoveryTargetHandler.java | 11 ++++---- .../PeerRecoveryTargetServiceTests.java | 4 ++- .../recovery/RecoverySourceHandlerTests.java | 3 ++- .../indices/recovery/AsyncRecoveryTarget.java | 5 ++-- 8 files changed, 49 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 50abeb2fb7a..92f1558d71e 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -531,11 +531,11 @@ public class PeerRecoveryTargetService implements IndexEventListener { @Override public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel, Task task) throws Exception { - try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() - )) { - recoveryRef.target().receiveFileInfo(request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames, - request.phase1ExistingFileSizes, request.totalTranslogOps); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { + final ActionListener listener = new ChannelActionListener<>(channel, Actions.FILES_INFO, request); + recoveryRef.target().receiveFileInfo( + request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames, request.phase1ExistingFileSizes, + request.totalTranslogOps, ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE)); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 959c13ccb89..9f5148f0232 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -406,17 +406,25 @@ public class RecoverySourceHandler { logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes), phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes)); - cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo( - phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.getAsInt())); - sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps); + final StepListener sendFileInfoStep = new StepListener<>(); + final StepListener cleanFilesStep = new StepListener<>(); + cancellableThreads.execute(() -> + recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, + phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep)); + + sendFileInfoStep.whenComplete(r -> { + sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps); + cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, cleanFilesStep); + }, listener::onFailure); + final long totalSize = totalSizeInBytes; final long existingTotalSize = existingTotalSizeInBytes; - cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, ActionListener.map(listener, aVoid -> { + cleanFilesStep.whenComplete(r -> { final TimeValue took = stopWatch.totalTime(); logger.trace("recovery [phase1]: took [{}]", took); - return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames, - phase1ExistingFileSizes, existingTotalSize, took); - })); + listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames, + phase1ExistingFileSizes, existingTotalSize, took)); + }, listener::onFailure); } else { logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target", recoverySourceMetadata.getSyncId()); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 55aa5b22595..9be57296cdf 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -378,17 +378,20 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget List phase1FileSizes, List phase1ExistingFileNames, List phase1ExistingFileSizes, - int totalTranslogOps) { - final RecoveryState.Index index = state().getIndex(); - for (int i = 0; i < phase1ExistingFileNames.size(); i++) { - index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true); - } - for (int i = 0; i < phase1FileNames.size(); i++) { - index.addFileDetail(phase1FileNames.get(i), phase1FileSizes.get(i), false); - } - state().getTranslog().totalOperations(totalTranslogOps); - state().getTranslog().totalOperationsOnStart(totalTranslogOps); - + int totalTranslogOps, + ActionListener listener) { + ActionListener.completeWith(listener, () -> { + final RecoveryState.Index index = state().getIndex(); + for (int i = 0; i < phase1ExistingFileNames.size(); i++) { + index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true); + } + for (int i = 0; i < phase1FileNames.size(); i++) { + index.addFileDetail(phase1FileNames.get(i), phase1FileSizes.get(i), false); + } + state().getTranslog().totalOperations(totalTranslogOps); + state().getTranslog().totalOperationsOnStart(totalTranslogOps); + return null; + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 89f4cb22c2b..9be076bc732 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -88,7 +88,8 @@ public interface RecoveryTargetHandler { List phase1FileSizes, List phase1ExistingFileNames, List phase1ExistingFileSizes, - int totalTranslogOps); + int totalTranslogOps, + ActionListener listener); /** * After all source files has been sent over, this command is sent to the target so it can clean any local diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 6b786fdae4d..bb5457c1a3d 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -129,14 +129,13 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { @Override public void receiveFileInfo(List phase1FileNames, List phase1FileSizes, List phase1ExistingFileNames, - List phase1ExistingFileSizes, int totalTranslogOps) { - + List phase1ExistingFileSizes, int totalTranslogOps, ActionListener listener) { RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(recoveryId, shardId, - phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps); + phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps); transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILES_INFO, recoveryInfoFilesRequest, - TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); - + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), + new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null), + in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 5a6d7fbaa17..1154ce99078 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -145,11 +145,13 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId()); targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode)); final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null); + final PlainActionFuture receiveFileInfoFuture = new PlainActionFuture<>(); recoveryTarget.receiveFileInfo( mdFiles.stream().map(StoreFileMetaData::name).collect(Collectors.toList()), mdFiles.stream().map(StoreFileMetaData::length).collect(Collectors.toList()), - Collections.emptyList(), Collections.emptyList(), 0 + Collections.emptyList(), Collections.emptyList(), 0, receiveFileInfoFuture ); + receiveFileInfoFuture.actionGet(); List requests = new ArrayList<>(); for (StoreFileMetaData md : mdFiles) { try (IndexInput in = sourceShard.store().directory().openInput(md.name(), IOContext.READONCE)) { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index b69033ba9b4..215bf475a0c 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -753,7 +753,8 @@ public class RecoverySourceHandlerTests extends ESTestCase { @Override public void receiveFileInfo(List phase1FileNames, List phase1FileSizes, List phase1ExistingFileNames, - List phase1ExistingFileSizes, int totalTranslogOps) { + List phase1ExistingFileSizes, int totalTranslogOps, ActionListener listener) { + } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java index cf2b768f46d..afd2aa4e858 100644 --- a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java +++ b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java @@ -69,8 +69,9 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler { @Override public void receiveFileInfo(List phase1FileNames, List phase1FileSizes, List phase1ExistingFileNames, - List phase1ExistingFileSizes, int totalTranslogOps) { - target.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps); + List phase1ExistingFileSizes, int totalTranslogOps, ActionListener listener) { + executor.execute(() -> target.receiveFileInfo( + phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps, listener)); } @Override