Reestablish peer recovery after network errors (#57827)

Currently a network disruption will fail a peer recovery. This commit
adds network errors as retryable actions for the source node.
Additionally, it adds sequence numbers to the recovery request to
ensure that the requests are idempotent.

Additionally it adds a reestablish recovery action. The target node
will attempt to reestablish an existing recovery after a network
failure. This is necessary to ensure that the retries occurring on the
source node provide value in bidirectional failures.
This commit is contained in:
Tim Brooks 2020-06-08 14:17:52 -06:00 committed by GitHub
parent d5522c2747
commit 952cf770ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 760 additions and 195 deletions

View File

@ -438,8 +438,8 @@ public class CorruptedFileIT extends ESIntegTestCase {
if (truncate && req.length() > 1) {
BytesRef bytesRef = req.content().toBytesRef();
BytesArray array = new BytesArray(bytesRef.bytes, bytesRef.offset, (int) req.length() - 1);
request = new RecoveryFileChunkRequest(req.recoveryId(), req.shardId(), req.metadata(), req.position(),
array, req.lastChunk(), req.totalTranslogOps(), req.sourceThrottleTimeInNanos());
request = new RecoveryFileChunkRequest(req.recoveryId(), req.requestSeqNo(), req.shardId(), req.metadata(),
req.position(), array, req.lastChunk(), req.totalTranslogOps(), req.sourceThrottleTimeInNanos());
} else {
assert req.content().toBytesRef().bytes == req.content().toBytesRef().bytes : "no internal reference!!";
final byte[] array = req.content().toBytesRef().bytes;

View File

@ -727,7 +727,8 @@ public class IndexRecoveryIT extends ESIntegTestCase {
public void testTransientErrorsDuringRecoveryAreRetried() throws Exception {
final String indexName = "test";
final Settings nodeSettings = Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "360s")
.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms")
.put(NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), "500ms")
.put(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING.getKey(), "10s")
.build();
// start a master node
@ -798,7 +799,15 @@ public class IndexRecoveryIT extends ESIntegTestCase {
validator.accept(action, request);
connection.sendRequest(requestId, action, request, options);
});
TransientReceiveRejected handlingBehavior = new TransientReceiveRejected(recoveryActionToBlock, recoveryStarted);
Runnable connectionBreaker = () -> {
// Always break connection from source to remote to ensure that actions are retried
blueTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode());
if (randomBoolean()) {
// Sometimes break connection from remote to source to ensure that recovery is re-established
redTransportService.disconnectFromNode(blueTransportService.getLocalDiscoNode());
}
};
TransientReceiveRejected handlingBehavior = new TransientReceiveRejected(recoveryActionToBlock, recoveryStarted, connectionBreaker);
redTransportService.addRequestHandlingBehavior(recoveryActionToBlock, handlingBehavior);
try {
@ -822,11 +831,13 @@ public class IndexRecoveryIT extends ESIntegTestCase {
private final String actionName;
private final AtomicBoolean recoveryStarted;
private final Runnable connectionBreaker;
private final AtomicInteger blocksRemaining;
private TransientReceiveRejected(String actionName, AtomicBoolean recoveryStarted) {
private TransientReceiveRejected(String actionName, AtomicBoolean recoveryStarted, Runnable connectionBreaker) {
this.actionName = actionName;
this.recoveryStarted = recoveryStarted;
this.connectionBreaker = connectionBreaker;
this.blocksRemaining = new AtomicInteger(randomIntBetween(1, 3));
}
@ -835,11 +846,21 @@ public class IndexRecoveryIT extends ESIntegTestCase {
Task task) throws Exception {
recoveryStarted.set(true);
if (blocksRemaining.getAndUpdate(i -> i == 0 ? 0 : i - 1) != 0) {
String rejected = "rejected";
String circuit = "circuit";
String network = "network";
String reason = randomFrom(rejected, circuit, network);
if (reason.equals(rejected)) {
logger.info("--> preventing {} response by throwing exception", actionName);
if (randomBoolean()) {
throw new EsRejectedExecutionException();
} else {
} else if (reason.equals(circuit)) {
logger.info("--> preventing {} response by throwing exception", actionName);
throw new CircuitBreakingException("Broken", CircuitBreaker.Durability.PERMANENT);
} else if (reason.equals(network)) {
logger.info("--> preventing {} response by breaking connection", actionName);
connectionBreaker.run();
} else {
throw new AssertionError("Unknown failure reason: " + reason);
}
}
handler.messageReceived(request, channel, task);

View File

@ -1041,7 +1041,12 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.ingest.IngestProcessorException.class,
org.elasticsearch.ingest.IngestProcessorException::new,
157,
Version.V_7_5_0);
Version.V_7_5_0),
PEER_RECOVERY_NOT_FOUND_EXCEPTION(
org.elasticsearch.indices.recovery.PeerRecoveryNotFound.class,
org.elasticsearch.indices.recovery.PeerRecoveryNotFound::new,
158,
Version.V_7_9_0);
final Class<? extends ElasticsearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
public class PeerRecoveryNotFound extends ResourceNotFoundException {
public PeerRecoveryNotFound(final long recoveryId, final ShardId shardId, final String targetAllocationId) {
super("Peer recovery for " + shardId + " with [recoveryId: " + recoveryId + ", targetAllocationId: " + targetAllocationId
+ "] not found.");
}
public PeerRecoveryNotFound(StreamInput in) throws IOException {
super(in);
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.indices.recovery;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
@ -59,6 +60,7 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
public static class Actions {
public static final String START_RECOVERY = "internal:index/shard/recovery/start_recovery";
public static final String REESTABLISH_RECOVERY = "internal:index/shard/recovery/reestablish_recovery";
}
private final TransportService transportService;
@ -72,8 +74,16 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
this.transportService = transportService;
this.indicesService = indicesService;
this.recoverySettings = recoverySettings;
// When the target node wants to start a peer recovery it sends a START_RECOVERY request to the source
// node. Upon receiving START_RECOVERY, the source node will initiate the peer recovery.
transportService.registerRequestHandler(Actions.START_RECOVERY, ThreadPool.Names.GENERIC, StartRecoveryRequest::new,
new StartRecoveryTransportRequestHandler());
// When the target node's START_RECOVERY request has failed due to a network disconnection, it will
// send a REESTABLISH_RECOVERY. This attempts to reconnect to an existing recovery process taking
// place on the source node. If the recovery process no longer exists, then the REESTABLISH_RECOVERY
// action will fail and the target node will send a new START_RECOVERY request.
transportService.registerRequestHandler(Actions.REESTABLISH_RECOVERY, ThreadPool.Names.GENERIC, ReestablishRecoveryRequest::new,
new ReestablishRecoveryTransportRequestHandler());
}
@Override
@ -120,6 +130,15 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
handler.recoverToTarget(ActionListener.runAfter(listener, () -> ongoingRecoveries.remove(shard, handler)));
}
private void reestablish(ReestablishRecoveryRequest request, ActionListener<RecoveryResponse> listener) {
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
final IndexShard shard = indexService.getShard(request.shardId().id());
logger.trace("[{}][{}] reestablishing recovery {}", request.shardId().getIndex().getName(), request.shardId().id(),
request.recoveryId());
ongoingRecoveries.reestablishRecovery(request, shard, listener);
}
class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {
@Override
public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {
@ -127,6 +146,13 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
}
}
class ReestablishRecoveryTransportRequestHandler implements TransportRequestHandler<ReestablishRecoveryRequest> {
@Override
public void messageReceived(final ReestablishRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {
reestablish(request, new ChannelActionListener<>(channel, Actions.REESTABLISH_RECOVERY, request));
}
}
// exposed for testing
final int numberOfOngoingRecoveries() {
return ongoingRecoveries.ongoingRecoveries.size();
@ -147,6 +173,16 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
return handler;
}
synchronized void reestablishRecovery(ReestablishRecoveryRequest request, IndexShard shard,
ActionListener<RecoveryResponse> listener) {
assert lifecycle.started();
final ShardRecoveryContext shardContext = ongoingRecoveries.get(shard);
if (shardContext == null) {
throw new PeerRecoveryNotFound(request.recoveryId(), request.shardId(), request.targetAllocationId());
}
shardContext.reestablishRecovery(request, listener);
}
synchronized void remove(IndexShard shard, RecoverySourceHandler handler) {
final ShardRecoveryContext shardRecoveryContext = ongoingRecoveries.get(shard);
assert shardRecoveryContext != null : "Shard was not registered [" + shard + "]";
@ -218,6 +254,25 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
return handler;
}
/**
* Adds recovery source handler.
*/
synchronized void reestablishRecovery(ReestablishRecoveryRequest request, ActionListener<RecoveryResponse> listener) {
RecoverySourceHandler handler = null;
for (RecoverySourceHandler existingHandler : recoveryHandlers) {
if (existingHandler.getRequest().recoveryId() == request.recoveryId() &&
existingHandler.getRequest().targetAllocationId().equals(request.targetAllocationId())) {
handler = existingHandler;
break;
}
}
if (handler == null) {
throw new ResourceNotFoundException("Cannot reestablish recovery, recovery id [" + request.recoveryId()
+ "] not found.");
}
handler.addListener(listener);
}
private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) {
RecoverySourceHandler handler;
final RemoteRecoveryTargetHandler recoveryTarget =

View File

@ -27,6 +27,7 @@ import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.ClusterState;
@ -34,6 +35,7 @@ import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
@ -44,6 +46,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
@ -58,6 +61,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
@ -159,8 +163,16 @@ public class PeerRecoveryTargetService implements IndexEventListener {
}
}
private void doRecovery(final long recoveryId) {
final StartRecoveryRequest request;
protected void reestablishRecovery(final StartRecoveryRequest request, final String reason, TimeValue retryAfter) {
final long recoveryId = request.recoveryId();
logger.trace("will try to reestablish recovery with id [{}] in [{}] (reason [{}])", recoveryId, retryAfter, reason);
threadPool.schedule(new RecoveryRunner(recoveryId, request), retryAfter, ThreadPool.Names.GENERIC);
}
private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) {
final String actionName;
final TransportRequest requestToSend;
final StartRecoveryRequest startRequest;
final RecoveryState.Timer timer;
CancellableThreads cancellableThreads;
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
@ -171,6 +183,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
final RecoveryTarget recoveryTarget = recoveryRef.target();
timer = recoveryTarget.state().getTimer();
cancellableThreads = recoveryTarget.cancellableThreads();
if (preExistingRequest == null) {
try {
final IndexShard indexShard = recoveryTarget.indexShard();
indexShard.preRecovery();
@ -180,7 +193,9 @@ public class PeerRecoveryTargetService implements IndexEventListener {
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG :
"unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
requestToSend = startRequest;
actionName = PeerRecoverySourceService.Actions.START_RECOVERY;
} catch (final Exception e) {
// this will be logged as warning later on...
logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
@ -188,128 +203,29 @@ public class PeerRecoveryTargetService implements IndexEventListener {
new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true);
return;
}
logger.trace("{} starting recovery from {}", startRequest.shardId(), startRequest.sourceNode());
} else {
startRequest = preExistingRequest;
requestToSend = new ReestablishRecoveryRequest(recoveryId, startRequest.shardId(), startRequest.targetAllocationId());
actionName = PeerRecoverySourceService.Actions.REESTABLISH_RECOVERY;
logger.trace("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode());
}
Consumer<Exception> handleException = e -> {
if (logger.isTraceEnabled()) {
logger.trace(() -> new ParameterizedMessage(
"[{}][{}] Got exception on recovery", request.shardId().getIndex().getName(),
request.shardId().id()), e);
}
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof CancellableThreads.ExecutionCancelledException) {
// this can also come from the source wrapped in a RemoteTransportException
onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request,
"source has canceled the recovery", cause), false);
return;
}
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// do it twice, in case we have double transport exception
cause = ExceptionsHelper.unwrapCause(cause);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// here, we would add checks against exception that need to be retried (and not removeAndClean in this case)
if (cause instanceof IllegalIndexShardStateException || cause instanceof IndexNotFoundException ||
cause instanceof ShardNotFoundException) {
// if the target is not ready yet, retry
retryRecovery(
recoveryId,
"remote shard not ready",
recoverySettings.retryDelayStateSync(),
recoverySettings.activityTimeout());
return;
}
if (cause instanceof DelayRecoveryException) {
retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(),
recoverySettings.activityTimeout());
return;
}
if (cause instanceof ConnectTransportException) {
logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", request.shardId(),
recoverySettings.retryDelayNetwork(), cause.getMessage());
retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(),
recoverySettings.activityTimeout());
return;
}
if (cause instanceof AlreadyClosedException) {
onGoingRecoveries.failRecovery(recoveryId,
new RecoveryFailedException(request, "source shard is closed", cause), false);
return;
}
onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true);
};
RecoveryResponseHandler responseHandler = new RecoveryResponseHandler(startRequest, timer);
try {
logger.trace("{} starting recovery from {}", request.shardId(), request.sourceNode());
cancellableThreads.executeIO(() ->
// we still execute under cancelableThreads here to ensure we interrupt any blocking call to the network if any
// on the underlying transport. It's unclear if we need this here at all after moving to async execution but
// the issues that a missing call to this could cause are sneaky and hard to debug. If we don't need it on this
// call we can potentially remove it altogether which we should do it in a major release only with enough
// time to test. This shoudl be done for 7.0 if possible
transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
new TransportResponseHandler<RecoveryResponse>() {
@Override
public void handleResponse(RecoveryResponse recoveryResponse) {
final TimeValue recoveryTime = new TimeValue(timer.time());
// do this through ongoing recoveries to remove it from the collection
onGoingRecoveries.markRecoveryAsDone(recoveryId);
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(request.shardId().getIndex().getName()).append(']')
.append('[').append(request.shardId().id()).append("] ");
sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(recoveryTime)
.append("]\n");
sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]")
.append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
.append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [")
.append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']').append("\n");
sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size())
.append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize))
.append("]\n");
sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]")
.append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
.append("\n");
logger.trace("{}", sb);
} else {
logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), request.sourceNode(),
recoveryTime);
}
}
@Override
public void handleException(TransportException e) {
handleException.accept(e);
}
@Override
public String executor() {
// we do some heavy work like refreshes in the response so fork off to the generic threadpool
return ThreadPool.Names.GENERIC;
}
@Override
public RecoveryResponse read(StreamInput in) throws IOException {
return new RecoveryResponse(in);
}
})
transportService.sendRequest(startRequest.sourceNode(), actionName, requestToSend, responseHandler)
);
} catch (CancellableThreads.ExecutionCancelledException e) {
logger.trace("recovery cancelled", e);
} catch (Exception e) {
handleException.accept(e);
responseHandler.onException(e);
}
}
@ -381,9 +297,12 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request);
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps(),
ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.PREPARE_TRANSLOG, request);
if (listener == null) {
return;
}
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps(), listener);
}
}
}
@ -393,9 +312,12 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.FINALIZE, request);
recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(),
ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FINALIZE, request);
if (listener == null) {
return;
}
recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener);
}
}
}
@ -420,10 +342,14 @@ public class PeerRecoveryTargetService implements IndexEventListener {
Task task) throws IOException {
try (RecoveryRef recoveryRef =
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
final RecoveryTarget recoveryTarget = recoveryRef.target();
final ActionListener<RecoveryTranslogOperationsResponse> listener =
new ChannelActionListener<>(channel, Actions.TRANSLOG_OPS, request);
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FILES_INFO, request,
nullVal -> new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint()));
if (listener == null) {
return;
}
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
final Consumer<Exception> retryOnMappingException = exception -> {
// in very rare cases a translog replay from primary is processed before a mapping update on this node
// which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node.
@ -464,7 +390,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
request.retentionLeases(),
request.mappingVersionOnPrimary(),
ActionListener.wrap(
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
checkpoint -> listener.onResponse(null),
e -> {
// do not retry if the mapping on replica is at least as recent as the mapping
// that the primary used to index the operations in the request.
@ -484,10 +410,14 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.FILES_INFO, request);
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FILES_INFO, request);
if (listener == null) {
return;
}
recoveryRef.target().receiveFileInfo(
request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames, request.phase1ExistingFileSizes,
request.totalTranslogOps, ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
request.totalTranslogOps, listener);
}
}
}
@ -497,9 +427,13 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.CLEAN_FILES, request);
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.CLEAN_FILES, request);
if (listener == null) {
return;
}
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(),
ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
listener);
}
}
}
@ -513,6 +447,11 @@ public class PeerRecoveryTargetService implements IndexEventListener {
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.target();
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FILE_CHUNK, request);
if (listener == null) {
return;
}
final RecoveryState.Index indexState = recoveryTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
@ -529,19 +468,47 @@ public class PeerRecoveryTargetService implements IndexEventListener {
recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
}
}
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.FILE_CHUNK, request);
recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(), request.lastChunk(),
request.totalTranslogOps(), ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
request.totalTranslogOps(), listener);
}
}
}
private ActionListener<Void> createOrFinishListener(final RecoveryRef recoveryRef, final TransportChannel channel,
final String action, final RecoveryTransportRequest request) {
return createOrFinishListener(recoveryRef, channel, action, request, nullVal -> TransportResponse.Empty.INSTANCE);
}
private ActionListener<Void> createOrFinishListener(final RecoveryRef recoveryRef, final TransportChannel channel,
final String action, final RecoveryTransportRequest request,
final CheckedFunction<Void, TransportResponse, Exception> responseFn) {
final RecoveryTarget recoveryTarget = recoveryRef.target();
final ActionListener<TransportResponse> channelListener = new ChannelActionListener<>(channel, action, request);
final ActionListener<Void> voidListener = ActionListener.map(channelListener, responseFn);
final long requestSeqNo = request.requestSeqNo();
final ActionListener<Void> listener;
if (requestSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
listener = recoveryTarget.markRequestReceivedAndCreateListener(requestSeqNo, voidListener);
} else {
listener = voidListener;
}
return listener;
}
class RecoveryRunner extends AbstractRunnable {
final long recoveryId;
private final StartRecoveryRequest startRecoveryRequest;
RecoveryRunner(long recoveryId) {
this(recoveryId, null);
}
RecoveryRunner(long recoveryId, StartRecoveryRequest startRecoveryRequest) {
this.recoveryId = recoveryId;
this.startRecoveryRequest = startRecoveryRequest;
}
@Override
@ -562,7 +529,132 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void doRun() {
doRecovery(recoveryId);
doRecovery(recoveryId, startRecoveryRequest);
}
}
private class RecoveryResponseHandler implements TransportResponseHandler<RecoveryResponse> {
private final long recoveryId;
private final StartRecoveryRequest request;
private final RecoveryState.Timer timer;
private RecoveryResponseHandler(final StartRecoveryRequest request, final RecoveryState.Timer timer) {
this.recoveryId = request.recoveryId();
this.request = request;
this.timer = timer;
}
@Override
public void handleResponse(RecoveryResponse recoveryResponse) {
final TimeValue recoveryTime = new TimeValue(timer.time());
// do this through ongoing recoveries to remove it from the collection
onGoingRecoveries.markRecoveryAsDone(recoveryId);
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(request.shardId().getIndex().getName()).append(']')
.append('[').append(request.shardId().id()).append("] ");
sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(recoveryTime)
.append("]\n");
sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]")
.append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
.append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [")
.append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']').append("\n");
sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size())
.append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize))
.append("]\n");
sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]")
.append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
.append("\n");
logger.trace("{}", sb);
} else {
logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), request.sourceNode(),
recoveryTime);
}
}
@Override
public void handleException(TransportException e) {
onException(e);
}
private void onException(Exception e) {
if (logger.isTraceEnabled()) {
logger.trace(() -> new ParameterizedMessage(
"[{}][{}] Got exception on recovery", request.shardId().getIndex().getName(),
request.shardId().id()), e);
}
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof CancellableThreads.ExecutionCancelledException) {
// this can also come from the source wrapped in a RemoteTransportException
onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request,
"source has canceled the recovery", cause), false);
return;
}
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// do it twice, in case we have double transport exception
cause = ExceptionsHelper.unwrapCause(cause);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// here, we would add checks against exception that need to be retried (and not removeAndClean in this case)
if (cause instanceof IllegalIndexShardStateException || cause instanceof IndexNotFoundException ||
cause instanceof ShardNotFoundException) {
// if the target is not ready yet, retry
retryRecovery(
recoveryId,
"remote shard not ready",
recoverySettings.retryDelayStateSync(),
recoverySettings.activityTimeout());
return;
}
// PeerRecoveryNotFound is returned when the source node cannot find the recovery requested by
// the REESTABLISH_RECOVERY request. In this case, we delay and then attempt to restart.
if (cause instanceof DelayRecoveryException || cause instanceof PeerRecoveryNotFound) {
retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(),
recoverySettings.activityTimeout());
return;
}
if (cause instanceof ConnectTransportException) {
logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", request.shardId(),
recoverySettings.retryDelayNetwork(), cause.getMessage());
if (request.sourceNode().getVersion().onOrAfter(Version.V_7_9_0)) {
reestablishRecovery(request, cause.getMessage(), recoverySettings.retryDelayNetwork());
} else {
retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(),
recoverySettings.activityTimeout());
}
return;
}
if (cause instanceof AlreadyClosedException) {
onGoingRecoveries.failRecovery(recoveryId,
new RecoveryFailedException(request, "source shard is closed", cause), false);
return;
}
onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true);
}
@Override
public String executor() {
// we do some heavy work like refreshes in the response so fork off to the generic threadpool
return ThreadPool.Names.GENERIC;
}
@Override
public RecoveryResponse read(StreamInput in) throws IOException {
return new RecoveryResponse(in);
}
}
}

View File

@ -25,11 +25,10 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
public class RecoveryCleanFilesRequest extends TransportRequest {
public class RecoveryCleanFilesRequest extends RecoveryTransportRequest {
private final long recoveryId;
private final ShardId shardId;
@ -37,8 +36,9 @@ public class RecoveryCleanFilesRequest extends TransportRequest {
private final int totalTranslogOps;
private final long globalCheckpoint;
RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles,
RecoveryCleanFilesRequest(long recoveryId, long requestSeqNo, ShardId shardId, Store.MetadataSnapshot snapshotFiles,
int totalTranslogOps, long globalCheckpoint) {
super(requestSeqNo);
this.recoveryId = recoveryId;
this.shardId = shardId;
this.snapshotFiles = snapshotFiles;

View File

@ -26,20 +26,19 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
public final class RecoveryFileChunkRequest extends TransportRequest {
private boolean lastChunk;
private long recoveryId;
private ShardId shardId;
private long position;
private BytesReference content;
private StoreFileMetadata metadata;
private long sourceThrottleTimeInNanos;
public final class RecoveryFileChunkRequest extends RecoveryTransportRequest {
private final boolean lastChunk;
private final long recoveryId;
private final ShardId shardId;
private final long position;
private final BytesReference content;
private final StoreFileMetadata metadata;
private final long sourceThrottleTimeInNanos;
private int totalTranslogOps;
private final int totalTranslogOps;
public RecoveryFileChunkRequest(StreamInput in) throws IOException {
super(in);
@ -58,8 +57,9 @@ public final class RecoveryFileChunkRequest extends TransportRequest {
sourceThrottleTimeInNanos = in.readLong();
}
public RecoveryFileChunkRequest(long recoveryId, ShardId shardId, StoreFileMetadata metadata, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps, long sourceThrottleTimeInNanos) {
public RecoveryFileChunkRequest(long recoveryId, final long requestSeqNo, ShardId shardId, StoreFileMetadata metadata, long position,
BytesReference content, boolean lastChunk, int totalTranslogOps, long sourceThrottleTimeInNanos) {
super(requestSeqNo);
this.recoveryId = recoveryId;
this.shardId = shardId;
this.metadata = metadata;

View File

@ -22,13 +22,12 @@ package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class RecoveryFilesInfoRequest extends TransportRequest {
public class RecoveryFilesInfoRequest extends RecoveryTransportRequest {
private long recoveryId;
private ShardId shardId;
@ -70,8 +69,10 @@ public class RecoveryFilesInfoRequest extends TransportRequest {
totalTranslogOps = in.readVInt();
}
RecoveryFilesInfoRequest(long recoveryId, ShardId shardId, List<String> phase1FileNames, List<Long> phase1FileSizes,
List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
RecoveryFilesInfoRequest(long recoveryId, long requestSeqNo, ShardId shardId, List<String> phase1FileNames,
List<Long> phase1FileSizes, List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes,
int totalTranslogOps) {
super(requestSeqNo);
this.recoveryId = recoveryId;
this.shardId = shardId;
this.phase1FileNames = phase1FileNames;

View File

@ -24,11 +24,10 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
final class RecoveryFinalizeRecoveryRequest extends TransportRequest {
final class RecoveryFinalizeRecoveryRequest extends RecoveryTransportRequest {
private final long recoveryId;
private final ShardId shardId;
@ -47,7 +46,9 @@ final class RecoveryFinalizeRecoveryRequest extends TransportRequest {
}
}
RecoveryFinalizeRecoveryRequest(final long recoveryId, final ShardId shardId, final long globalCheckpoint, final long trimAboveSeqNo) {
RecoveryFinalizeRecoveryRequest(final long recoveryId, final long requestSeqNo, final ShardId shardId,
final long globalCheckpoint, final long trimAboveSeqNo) {
super(requestSeqNo);
this.recoveryId = recoveryId;
this.shardId = shardId;
this.globalCheckpoint = globalCheckpoint;

View File

@ -24,17 +24,17 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
class RecoveryPrepareForTranslogOperationsRequest extends RecoveryTransportRequest {
private final long recoveryId;
private final ShardId shardId;
private final int totalTranslogOps;
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, long requestSeqNo, ShardId shardId, int totalTranslogOps) {
super(requestSeqNo);
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;

View File

@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
public class RecoveryRequestTracker {
private final Map<Long, ListenableFuture<Void>> ongoingRequests = Collections.synchronizedMap(new HashMap<>());
private final LocalCheckpointTracker checkpointTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
/**
* This method will mark that a request with a unique sequence number has been received. If this is the
* first time the unique request has been received, this method will return a listener to be completed.
* The caller should then perform the requested action and complete the returned listener.
*
*
* If the unique request has already been received, this method will either complete the provided listener
* or attach that listener to the listener returned in the first call. In this case, the method will
* return null and the caller should not perform the requested action as a prior caller is already
* performing the action.
*/
@Nullable
public synchronized ActionListener<Void> markReceivedAndCreateListener(long requestSeqNo, ActionListener<Void> listener) {
if (checkpointTracker.hasProcessed(requestSeqNo)) {
final ListenableFuture<Void> existingFuture = ongoingRequests.get(requestSeqNo);
if (existingFuture != null) {
existingFuture.addListener(listener, EsExecutors.newDirectExecutorService());
} else {
listener.onResponse(null);
}
return null;
} else {
checkpointTracker.markSeqNoAsProcessed(requestSeqNo);
final ListenableFuture<Void> future = new ListenableFuture<>();
ongoingRequests.put(requestSeqNo, future);
future.addListener(new ActionListener<Void>() {
@Override
public void onResponse(Void v) {
ongoingRequests.remove(requestSeqNo);
listener.onResponse(v);
}
@Override
public void onFailure(Exception e) {
// We do not remove the future to cache the error for retried requests
listener.onFailure(e);
}
}, EsExecutors.newDirectExecutorService());
return future;
}
}
}

View File

@ -51,8 +51,10 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
@ -114,6 +116,7 @@ public class RecoverySourceHandler {
private final ThreadPool threadPool;
private final CancellableThreads cancellableThreads = new CancellableThreads();
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final ListenableFuture<RecoveryResponse> future = new ListenableFuture<>();
public RecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, ThreadPool threadPool,
StartRecoveryRequest request, int fileChunkSizeInBytes, int maxConcurrentFileChunks) {
@ -132,12 +135,16 @@ public class RecoverySourceHandler {
return request;
}
public void addListener(ActionListener<RecoveryResponse> listener) {
future.addListener(listener, EsExecutors.newDirectExecutorService());
}
/**
* performs the recovery from the local engine to the target
*/
public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
addListener(listener);
final Closeable releaseResources = () -> IOUtils.close(resources);
final ActionListener<RecoveryResponse> wrappedListener = ActionListener.notifyOnce(listener);
try {
cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
final RuntimeException e;
@ -149,12 +156,12 @@ public class RecoverySourceHandler {
if (beforeCancelEx != null) {
e.addSuppressed(beforeCancelEx);
}
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
throw e;
});
final Consumer<Exception> onFailure = e -> {
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[onFailure]");
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
};
final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled();
@ -336,13 +343,13 @@ public class RecoverySourceHandler {
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
try {
wrappedListener.onResponse(response);
future.onResponse(response);
} finally {
IOUtils.close(resources);
}
}, onFailure);
} catch (Exception e) {
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
}
}

View File

@ -73,6 +73,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final IndexShard indexShard;
private final DiscoveryNode sourceNode;
private final MultiFileWriter multiFileWriter;
private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker();
private final Store store;
private final PeerRecoveryTargetService.RecoveryListener listener;
@ -120,6 +121,10 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
return new RecoveryTarget(indexShard, sourceNode, listener);
}
public ActionListener<Void> markRequestReceivedAndCreateListener(long requestSeqNo, ActionListener<Void> listener) {
return requestTracker.markReceivedAndCreateListener(requestSeqNo, listener);
}
public long recoveryId() {
return recoveryId;
}

View File

@ -27,12 +27,11 @@ import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
import java.util.List;
public class RecoveryTranslogOperationsRequest extends TransportRequest {
public class RecoveryTranslogOperationsRequest extends RecoveryTransportRequest {
private final long recoveryId;
private final ShardId shardId;
@ -45,6 +44,7 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
RecoveryTranslogOperationsRequest(
final long recoveryId,
final long requestSeqNo,
final ShardId shardId,
final List<Translog.Operation> operations,
final int totalTranslogOps,
@ -52,6 +52,7 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
final long maxSeqNoOfUpdatesOrDeletesOnPrimary,
final RetentionLeases retentionLeases,
final long mappingVersionOnPrimary) {
super(requestSeqNo);
this.recoveryId = recoveryId;
this.shardId = shardId;
this.operations = operations;

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
public abstract class RecoveryTransportRequest extends TransportRequest {
private final long requestSeqNo;
RecoveryTransportRequest(StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
requestSeqNo = in.readLong();
} else {
requestSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}
RecoveryTransportRequest(long requestSeqNo) {
this.requestSeqNo = requestSeqNo;
}
public long requestSeqNo() {
return requestSeqNo;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(org.elasticsearch.Version.V_7_9_0)) {
out.writeLong(requestSeqNo);
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
/**
* Represents a request for starting a peer recovery.
*/
public class ReestablishRecoveryRequest extends TransportRequest {
private final long recoveryId;
private final ShardId shardId;
private final String targetAllocationId;
public ReestablishRecoveryRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
targetAllocationId = in.readString();
}
public ReestablishRecoveryRequest(final long recoveryId, final ShardId shardId, final String targetAllocationId) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.targetAllocationId = targetAllocationId;
}
public long recoveryId() {
return this.recoveryId;
}
public ShardId shardId() {
return shardId;
}
public String targetAllocationId() {
return targetAllocationId;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeString(targetAllocationId);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.RetryableAction;
@ -42,9 +43,10 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
@ -71,8 +73,10 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
private final TransportRequestOptions fileChunkRequestOptions;
private final AtomicLong bytesSinceLastPause = new AtomicLong();
private final AtomicLong requestSeqNoGenerator = new AtomicLong(0);
private final Consumer<Long> onSourceThrottle;
private final boolean retriesSupported;
private volatile boolean isCancelled = false;
public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService,
@ -92,13 +96,15 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())
.build();
this.retriesSupported = targetNode.getVersion().onOrAfter(Version.V_7_9_0);
}
@Override
public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) {
final String action = PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG;
final long requestSeqNo = requestSeqNoGenerator.getAndIncrement();
final RecoveryPrepareForTranslogOperationsRequest request =
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps);
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, requestSeqNo, shardId, totalTranslogOps);
final TransportRequestOptions options =
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build();
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
@ -109,8 +115,9 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
@Override
public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, final ActionListener<Void> listener) {
final String action = PeerRecoveryTargetService.Actions.FINALIZE;
final long requestSeqNo = requestSeqNoGenerator.getAndIncrement();
final RecoveryFinalizeRecoveryRequest request =
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint, trimAboveSeqNo);
new RecoveryFinalizeRecoveryRequest(recoveryId, requestSeqNo, shardId, globalCheckpoint, trimAboveSeqNo);
final TransportRequestOptions options =
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build();
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
@ -138,8 +145,10 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
final long mappingVersionOnPrimary,
final ActionListener<Long> listener) {
final String action = PeerRecoveryTargetService.Actions.TRANSLOG_OPS;
final long requestSeqNo = requestSeqNoGenerator.getAndIncrement();
final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest(
recoveryId,
requestSeqNo,
shardId,
operations,
totalTranslogOps,
@ -156,7 +165,8 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes, int totalTranslogOps, ActionListener<Void> listener) {
final String action = PeerRecoveryTargetService.Actions.FILES_INFO;
RecoveryFilesInfoRequest request = new RecoveryFilesInfoRequest(recoveryId, shardId, phase1FileNames, phase1FileSizes,
final long requestSeqNo = requestSeqNoGenerator.getAndIncrement();
RecoveryFilesInfoRequest request = new RecoveryFilesInfoRequest(recoveryId, requestSeqNo, shardId, phase1FileNames, phase1FileSizes,
phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
final TransportRequestOptions options =
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build();
@ -169,8 +179,9 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetadata,
ActionListener<Void> listener) {
final String action = PeerRecoveryTargetService.Actions.CLEAN_FILES;
final long requestSeqNo = requestSeqNoGenerator.getAndIncrement();
final RecoveryCleanFilesRequest request =
new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetadata, totalTranslogOps, globalCheckpoint);
new RecoveryCleanFilesRequest(recoveryId, requestSeqNo, shardId, sourceMetadata, totalTranslogOps, globalCheckpoint);
final TransportRequestOptions options =
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build();
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
@ -204,12 +215,13 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
}
final String action = PeerRecoveryTargetService.Actions.FILE_CHUNK;
final long requestSeqNo = requestSeqNoGenerator.getAndIncrement();
/* we send estimateTotalOperations with every request since we collect stats on the target and that way we can
* see how many translog ops we accumulate while copying files across the network. A future optimization
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
*/
final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest(
recoveryId, shardId, fileMetadata, position, content, lastChunk, totalTranslogOps, throttleTimeInNanos);
recoveryId, requestSeqNo, shardId, fileMetadata, position, content, lastChunk, totalTranslogOps, throttleTimeInNanos);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
executeRetryableAction(action, request, fileChunkRequestOptions, ActionListener.map(listener, r -> null), reader);
}
@ -230,7 +242,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
});
}
private <T extends TransportResponse> void executeRetryableAction(String action, TransportRequest request,
private <T extends TransportResponse> void executeRetryableAction(String action, RecoveryTransportRequest request,
TransportRequestOptions options, ActionListener<T> actionListener,
Writeable.Reader<T> reader) {
final Object key = new Object();
@ -247,7 +259,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
@Override
public boolean shouldRetry(Exception e) {
return retryableException(e);
return retriesSupported && retryableException(e);
}
};
onGoingRetryableActions.put(key, retryableAction);
@ -258,7 +270,12 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
}
private static boolean retryableException(Exception e) {
if (e instanceof RemoteTransportException) {
if (e instanceof ConnectTransportException) {
return true;
} else if (e instanceof SendRequestTransportException) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
return cause instanceof ConnectTransportException;
} else if (e instanceof RemoteTransportException) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
return cause instanceof CircuitBreakingException ||
cause instanceof EsRejectedExecutionException;

View File

@ -69,6 +69,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.indices.InvalidIndexTemplateException;
import org.elasticsearch.indices.recovery.PeerRecoveryNotFound;
import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
import org.elasticsearch.ingest.IngestProcessorException;
import org.elasticsearch.repositories.RepositoryException;
@ -826,6 +827,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(155, ShardNotInPrimaryModeException.class);
ids.put(156, RetentionLeaseInvalidRetainingSeqNoException.class);
ids.put(157, IngestProcessorException.class);
ids.put(158, PeerRecoveryNotFound.class);
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

View File

@ -89,6 +89,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
);
receiveFileInfoFuture.actionGet();
List<RecoveryFileChunkRequest> requests = new ArrayList<>();
long seqNo = 0;
for (StoreFileMetadata md : mdFiles) {
try (IndexInput in = sourceShard.store().directory().openInput(md.name(), IOContext.READONCE)) {
int pos = 0;
@ -96,7 +97,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
int length = between(1, Math.toIntExact(md.length() - pos));
byte[] buffer = new byte[length];
in.readBytes(buffer, 0, length);
requests.add(new RecoveryFileChunkRequest(0, sourceShard.shardId(), md, pos, new BytesArray(buffer),
requests.add(new RecoveryFileChunkRequest(0, seqNo++, sourceShard.shardId(), md, pos, new BytesArray(buffer),
pos + length == md.length(), 1, 1));
pos += length;
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
public class RecoveryRequestTrackerTests extends ESTestCase {
private TestThreadPool threadPool;
@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getTestName());
}
@Override
public void tearDown() throws Exception {
terminate(threadPool);
super.tearDown();
}
public void testIdempotencyIsEnforced() {
Set<Long> seqNosReturned = ConcurrentCollections.newConcurrentSet();
ConcurrentMap<Long, Set<PlainActionFuture<Void>>> seqToResult = ConcurrentCollections.newConcurrentMap();
RecoveryRequestTracker requestTracker = new RecoveryRequestTracker();
int numberOfRequests = randomIntBetween(100, 200);
for (int j = 0; j < numberOfRequests; ++j) {
final long seqNo = j;
int iterations = randomIntBetween(2, 5);
for (int i = 0; i < iterations; ++i) {
threadPool.generic().execute(() -> {
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
ActionListener<Void> listener = requestTracker.markReceivedAndCreateListener(seqNo, future);
Set<PlainActionFuture<Void>> set = seqToResult.computeIfAbsent(seqNo, (k) -> ConcurrentCollections.newConcurrentSet());
set.add(future);
if (listener != null) {
boolean added = seqNosReturned.add(seqNo);
// Ensure that we only return 1 future per sequence number
assertTrue(added);
if (rarely()) {
listener.onFailure(new Exception());
} else {
listener.onResponse(null);
}
}
});
}
}
seqToResult.values().stream().flatMap(Collection::stream).forEach(f -> {
try {
f.actionGet();
} catch (Exception e) {
// Ignore for now. We will assert later.
}
});
for (Set<PlainActionFuture<Void>> value : seqToResult.values()) {
Optional<PlainActionFuture<Void>> first = value.stream().findFirst();
assertTrue(first.isPresent());
Exception expectedException = null;
try {
first.get().actionGet();
} catch (Exception e) {
expectedException = e;
}
for (PlainActionFuture<Void> future : value) {
assertTrue(future.isDone());
if (expectedException == null) {
future.actionGet();
} else {
try {
future.actionGet();
fail("expected exception");
} catch (Exception e) {
assertSame(e, expectedException);
}
}
}
}
}
}