Make finalize step of recovery source non-blocking (#37388)

Relates #37291
This commit is contained in:
Nhat Nguyen 2019-01-14 18:20:54 -05:00 committed by GitHub
parent 36a3b84fc9
commit 397f315f56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 102 additions and 48 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedSupplier;
import java.util.ArrayList;
import java.util.List;
@ -180,4 +181,16 @@ public interface ActionListener<Response> {
}
};
}
/**
* Completes the given listener with the result from the provided supplier accordingly.
* This method is mainly used to complete a listener with a block of synchronous code.
*/
static <Response> void completeWith(ActionListener<Response> listener, CheckedSupplier<Response, ? extends Exception> supplier) {
try {
listener.onResponse(supplier.get());
} catch (Exception e) {
listener.onFailure(e);
}
}
}

View File

@ -37,10 +37,16 @@ public class ActionListenerResponseHandler<Response extends TransportResponse> i
private final ActionListener<? super Response> listener;
private final Writeable.Reader<Response> reader;
private final String executor;
public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader) {
public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader, String executor) {
this.listener = Objects.requireNonNull(listener);
this.reader = Objects.requireNonNull(reader);
this.executor = Objects.requireNonNull(executor);
}
public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader) {
this(listener, reader, ThreadPool.Names.SAME);
}
@Override
@ -55,7 +61,7 @@ public class ActionListenerResponseHandler<Response extends TransportResponse> i
@Override
public String executor() {
return ThreadPool.Names.SAME;
return executor;
}
@Override

View File

@ -445,11 +445,12 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef =
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
recoveryRef.target().finalizeRecovery(request.globalCheckpoint());
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener =
new HandledTransportAction.ChannelActionListener<>(channel, Actions.FINALIZE, request);
recoveryRef.target().finalizeRecovery(request.globalCheckpoint(),
ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

View File

@ -32,6 +32,7 @@ import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.StopWatch;
@ -71,6 +72,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
@ -137,6 +139,9 @@ public class RecoverySourceHandler {
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
throw e;
});
final Consumer<Exception> onFailure = e ->
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
runUnderPrimaryPermit(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
@ -235,16 +240,21 @@ public class RecoverySourceHandler {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint);
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
IOUtils.close(resources);
wrappedListener.onResponse(
new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
final StepListener<Void> finalizeStep = new StepListener<>();
finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint, finalizeStep);
finalizeStep.whenComplete(r -> {
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, prepareEngineTime.millis(),
sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis())
);
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
prepareEngineTime.millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
try {
wrappedListener.onResponse(response);
} finally {
IOUtils.close(resources);
}
}, onFailure);
} catch (Exception e) {
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
}
@ -585,10 +595,7 @@ public class RecoverySourceHandler {
return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps, tookTime);
}
/*
* finalizes the recovery process
*/
public void finalizeRecovery(final long targetLocalCheckpoint) throws IOException {
void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener<Void> listener) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
@ -604,21 +611,26 @@ public class RecoverySourceHandler {
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
final long globalCheckpoint = shard.getGlobalCheckpoint();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);
final StepListener<Void> finalizeListener = new StepListener<>();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, finalizeListener));
finalizeListener.whenComplete(r -> {
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);
if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext));
/*
* if the recovery process fails after disabling primary mode on the source shard, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
}
stopWatch.stop();
logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
// TODO: make relocated async
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext));
/*
* if the recovery process fails after disabling primary mode on the source shard, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
}
stopWatch.stop();
logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
listener.onResponse(null);
}, listener::onFailure);
}
static final class SendSnapshotResult {

View File

@ -372,12 +372,15 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
@Override
public void finalizeRecovery(final long globalCheckpoint) throws IOException {
final IndexShard indexShard = indexShard();
indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
// Persist the global checkpoint.
indexShard.sync();
indexShard.finalizeRecovery();
public void finalizeRecovery(final long globalCheckpoint, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
final IndexShard indexShard = indexShard();
indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
// Persist the global checkpoint.
indexShard.sync();
indexShard.finalizeRecovery();
return null;
});
}
@Override

View File

@ -42,8 +42,9 @@ public interface RecoveryTargetHandler {
* updates the global checkpoint.
*
* @param globalCheckpoint the global checkpoint on the recovery source
* @param listener the listener which will be notified when this method is completed
*/
void finalizeRecovery(long globalCheckpoint) throws IOException;
void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener);
/**
* Blockingly waits for cluster state with at least clusterStateVersion to be available

View File

@ -30,6 +30,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportFuture;
import org.elasticsearch.transport.TransportRequestOptions;
@ -85,11 +86,12 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
}
@Override
public void finalizeRecovery(final long globalCheckpoint) {
public void finalizeRecovery(final long globalCheckpoint, final ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}
@Override

View File

@ -18,16 +18,20 @@
*/
package org.elasticsearch.action;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class ActionListenerTests extends ESTestCase {
@ -201,4 +205,16 @@ public class ActionListenerTests extends ESTestCase {
assertThat(onFailureTimes.get(), equalTo(1));
}
}
public void testCompleteWith() {
PlainActionFuture<Integer> onResponseListener = new PlainActionFuture<>();
ActionListener.completeWith(onResponseListener, () -> 100);
assertThat(onResponseListener.isDone(), equalTo(true));
assertThat(onResponseListener.actionGet(), equalTo(100));
PlainActionFuture<Integer> onFailureListener = new PlainActionFuture<>();
ActionListener.completeWith(onFailureListener, () -> { throw new IOException("not found"); });
assertThat(onFailureListener.isDone(), equalTo(true));
assertThat(expectThrows(ExecutionException.class, onFailureListener::get).getCause(), instanceOf(IOException.class));
}
}

View File

@ -24,6 +24,7 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest;
@ -847,13 +848,13 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
@Override
public void finalizeRecovery(long globalCheckpoint) throws IOException {
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
if (hasBlocked() == false) {
// it maybe that not ops have been transferred, block now
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
blockIfNeeded(RecoveryState.Stage.FINALIZE);
super.finalizeRecovery(globalCheckpoint);
super.finalizeRecovery(globalCheckpoint, listener);
}
}

View File

@ -2524,9 +2524,8 @@ public class IndexShardTests extends IndexShardTestCase {
}
@Override
public void finalizeRecovery(long globalCheckpoint) throws IOException {
super.finalizeRecovery(globalCheckpoint);
assertListenerCalled.accept(replica);
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
super.finalizeRecovery(globalCheckpoint, ActionListener.runAfter(listener, () -> assertListenerCalled.accept(replica)));
}
}, false, true);

View File

@ -686,7 +686,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
@Override
public void finalizeRecovery(long globalCheckpoint) {
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
}
@Override