Make prepare engine step of recovery source non-blocking (#37573)

Relates #37174
This commit is contained in:
Nhat Nguyen 2019-01-21 21:35:10 -05:00 committed by GitHub
parent ca4b5861c8
commit 7394892b4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 84 additions and 73 deletions

View File

@ -431,13 +431,13 @@ public class PeerRecoveryTargetService implements IndexEventListener {
class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> { class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
@Override @Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
Task task) throws Exception { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() final ActionListener<TransportResponse> listener =
)) { new HandledTransportAction.ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request);
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps()); recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(),
ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));
} }
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} }
} }

View File

@ -197,51 +197,51 @@ public class RecoverySourceHandler {
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
+ startingSeqNo + "]"; + startingSeqNo + "]";
final TimeValue prepareEngineTime; final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
try { // For a sequence based recovery, the target can keep its local translog
// For a sequence based recovery, the target can keep its local translog prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
prepareEngineTime = prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
/*
* add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
* This means that any document indexed into the primary after this will be replicated to this replica as well
* make sure to do this before sampling the max sequence number in the next step, to ensure that we send
* all documents up to maxSeqNo in phase2.
*/
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
if (logger.isTraceEnabled()) {
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
}
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
resources.add(phase2Snapshot);
// we can release the retention lock here because the snapshot itself will retain the required operations.
IOUtils.close(retentionLock);
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>(); final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, prepareEngineStep.whenComplete(prepareEngineTime -> {
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep); /*
sendSnapshotStep.whenComplete( * add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
r -> IOUtils.close(phase2Snapshot), * This means that any document indexed into the primary after this will be replicated to this replica as well
e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e))); * make sure to do this before sampling the max sequence number in the next step, to ensure that we send
* all documents up to maxSeqNo in phase2.
*/
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
if (logger.isTraceEnabled()) {
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
}
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
resources.add(phase2Snapshot);
// we can release the retention lock here because the snapshot itself will retain the required operations.
retentionLock.close();
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
sendSnapshotStep.whenComplete(
r -> IOUtils.close(phase2Snapshot),
e -> {
IOUtils.closeWhileHandlingException(phase2Snapshot);
onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e));
});
}, onFailure);
final StepListener<Void> finalizeStep = new StepListener<>(); final StepListener<Void> finalizeStep = new StepListener<>();
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure); sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);
@ -251,7 +251,7 @@ public class RecoverySourceHandler {
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize, sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
prepareEngineTime.millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()); prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
try { try {
wrappedListener.onResponse(response); wrappedListener.onResponse(response);
} finally { } finally {
@ -484,16 +484,21 @@ public class RecoverySourceHandler {
} }
} }
TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException { void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<TimeValue> listener) {
StopWatch stopWatch = new StopWatch().start(); StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase1]: prepare remote engine for translog"); final ActionListener<Void> wrappedListener = ActionListener.wrap(
nullVal -> {
stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime);
listener.onResponse(tookTime);
},
e -> listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e)));
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes. // garbage collection (not the JVM's GC!) of tombstone deletes.
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps)); logger.trace("recovery [phase1]: prepare remote engine for translog");
stopWatch.stop(); cancellableThreads.execute(() ->
final TimeValue tookTime = stopWatch.totalTime(); recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, wrappedListener));
logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime);
return tookTime;
} }
/** /**

View File

@ -366,9 +366,12 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
/*** Implementation of {@link RecoveryTargetHandler } */ /*** Implementation of {@link RecoveryTargetHandler } */
@Override @Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
state().getTranslog().totalOperations(totalTranslogOps); ActionListener.completeWith(listener, () -> {
indexShard().openEngineAndSkipTranslogRecovery(); state().getTranslog().totalOperations(totalTranslogOps);
indexShard().openEngineAndSkipTranslogRecovery();
return null;
});
} }
@Override @Override

View File

@ -35,7 +35,7 @@ public interface RecoveryTargetHandler {
* @param fileBasedRecovery whether or not this call is part of an file based recovery * @param fileBasedRecovery whether or not this call is part of an file based recovery
* @param totalTranslogOps total translog operations expected to be sent * @param totalTranslogOps total translog operations expected to be sent
*/ */
void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException; void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener);
/** /**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and

View File

@ -77,11 +77,12 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
} }
@Override @Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery), new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
} }
@Override @Override

View File

@ -25,6 +25,7 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.BulkShardRequest;
@ -198,13 +199,14 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
Future<Void> fut = shards.asyncRecoverReplica(replica, Future<Void> fut = shards.asyncRecoverReplica(replica,
(shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){ (shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){
@Override @Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps,
ActionListener<Void> listener) {
try { try {
indexedOnPrimary.await(); indexedOnPrimary.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new AssertionError(e); throw new AssertionError(e);
} }
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps); super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
} }
}); });
fut.get(); fut.get();

View File

@ -2573,8 +2573,8 @@ public class IndexShardTests extends IndexShardTestCase {
}) { }) {
// we're only checking that listeners are called when the engine is open, before there is no point // we're only checking that listeners are called when the engine is open, before there is no point
@Override @Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps); super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
assertListenerCalled.accept(replica); assertListenerCalled.accept(replica);
} }

View File

@ -491,9 +491,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
} }
@Override @Override
TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException { void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<TimeValue> listener) {
prepareTargetForTranslogCalled.set(true); prepareTargetForTranslogCalled.set(true);
return super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps); super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps, listener);
} }
@Override @Override
@ -700,7 +700,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
class TestRecoveryTargetHandler implements RecoveryTargetHandler { class TestRecoveryTargetHandler implements RecoveryTargetHandler {
@Override @Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) { public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
} }
@Override @Override