Make peer recovery send file info step async (#43792)

Relates #36195
This commit is contained in:
Nhat Nguyen 2019-06-30 17:47:38 -04:00
parent 9aa6f7c434
commit 598e00a689
8 changed files with 49 additions and 34 deletions

View File

@ -531,11 +531,11 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().receiveFileInfo(request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames,
request.phase1ExistingFileSizes, request.totalTranslogOps);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.FILES_INFO, request);
recoveryRef.target().receiveFileInfo(
request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames, request.phase1ExistingFileSizes,
request.totalTranslogOps, ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
}
}
}

View File

@ -406,17 +406,25 @@ public class RecoverySourceHandler {
logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes),
phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes));
cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.getAsInt()));
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
final StepListener<Void> sendFileInfoStep = new StepListener<>();
final StepListener<Void> cleanFilesStep = new StepListener<>();
cancellableThreads.execute(() ->
recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep));
sendFileInfoStep.whenComplete(r -> {
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, cleanFilesStep);
}, listener::onFailure);
final long totalSize = totalSizeInBytes;
final long existingTotalSize = existingTotalSizeInBytes;
cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, ActionListener.map(listener, aVoid -> {
cleanFilesStep.whenComplete(r -> {
final TimeValue took = stopWatch.totalTime();
logger.trace("recovery [phase1]: took [{}]", took);
return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
phase1ExistingFileSizes, existingTotalSize, took);
}));
listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
phase1ExistingFileSizes, existingTotalSize, took));
}, listener::onFailure);
} else {
logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target",
recoverySourceMetadata.getSyncId());

View File

@ -378,17 +378,20 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
List<Long> phase1FileSizes,
List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes,
int totalTranslogOps) {
final RecoveryState.Index index = state().getIndex();
for (int i = 0; i < phase1ExistingFileNames.size(); i++) {
index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true);
}
for (int i = 0; i < phase1FileNames.size(); i++) {
index.addFileDetail(phase1FileNames.get(i), phase1FileSizes.get(i), false);
}
state().getTranslog().totalOperations(totalTranslogOps);
state().getTranslog().totalOperationsOnStart(totalTranslogOps);
int totalTranslogOps,
ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
final RecoveryState.Index index = state().getIndex();
for (int i = 0; i < phase1ExistingFileNames.size(); i++) {
index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true);
}
for (int i = 0; i < phase1FileNames.size(); i++) {
index.addFileDetail(phase1FileNames.get(i), phase1FileSizes.get(i), false);
}
state().getTranslog().totalOperations(totalTranslogOps);
state().getTranslog().totalOperationsOnStart(totalTranslogOps);
return null;
});
}
@Override

View File

@ -88,7 +88,8 @@ public interface RecoveryTargetHandler {
List<Long> phase1FileSizes,
List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes,
int totalTranslogOps);
int totalTranslogOps,
ActionListener<Void> listener);
/**
* After all source files has been sent over, this command is sent to the target so it can clean any local

View File

@ -129,14 +129,13 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
@Override
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
List<Long> phase1ExistingFileSizes, int totalTranslogOps, ActionListener<Void> listener) {
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(recoveryId, shardId,
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILES_INFO, recoveryInfoFilesRequest,
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}
@Override

View File

@ -145,11 +145,13 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId());
targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode));
final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null);
final PlainActionFuture<Void> receiveFileInfoFuture = new PlainActionFuture<>();
recoveryTarget.receiveFileInfo(
mdFiles.stream().map(StoreFileMetaData::name).collect(Collectors.toList()),
mdFiles.stream().map(StoreFileMetaData::length).collect(Collectors.toList()),
Collections.emptyList(), Collections.emptyList(), 0
Collections.emptyList(), Collections.emptyList(), 0, receiveFileInfoFuture
);
receiveFileInfoFuture.actionGet();
List<RecoveryFileChunkRequest> requests = new ArrayList<>();
for (StoreFileMetaData md : mdFiles) {
try (IndexInput in = sourceShard.store().directory().openInput(md.name(), IOContext.READONCE)) {

View File

@ -753,7 +753,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
@Override
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
List<Long> phase1ExistingFileSizes, int totalTranslogOps, ActionListener<Void> listener) {
}
@Override

View File

@ -69,8 +69,9 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler {
@Override
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
target.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
List<Long> phase1ExistingFileSizes, int totalTranslogOps, ActionListener<Void> listener) {
executor.execute(() -> target.receiveFileInfo(
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps, listener));
}
@Override