Retry failed peer recovery due to transient errors (#55883)

Currently a failed peer recovery action will fail an recovery. This
includes when the recovery fails due to potentially short lived
transient issues such as rejected exceptions or circuit breaking
errors.

This commit adds the concept of a retryable action. A retryable action
will be retryed in face of certain errors. The action will be retried
after an exponentially increasing backoff period. After defined time,
the action will timeout.

This commit only implements retries for responses that indicate the
target node has NOT executed the action.
This commit is contained in:
Tim Brooks 2020-04-28 13:52:49 -06:00 committed by GitHub
parent 1c73fcfc86
commit cd228095df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 712 additions and 58 deletions

View File

@ -0,0 +1,174 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayDeque;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A action that will be retried on failure if {@link RetryableAction#shouldRetry(Exception)} returns true.
* The executor the action will be executed on can be defined in the constructor. Otherwise, SAME is the
* default. The action will be retried with exponentially increasing delay periods until the timeout period
* has been reached.
*/
public abstract class RetryableAction<Response> {
private final Logger logger;
private final AtomicBoolean isDone = new AtomicBoolean(false);
private final ThreadPool threadPool;
private final long initialDelayMillis;
private final long timeoutMillis;
private final long startMillis;
private final ActionListener<Response> finalListener;
private final String executor;
public RetryableAction(Logger logger, ThreadPool threadPool, TimeValue initialDelay, TimeValue timeoutValue,
ActionListener<Response> listener) {
this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME);
}
public RetryableAction(Logger logger, ThreadPool threadPool, TimeValue initialDelay, TimeValue timeoutValue,
ActionListener<Response> listener, String executor) {
this.logger = logger;
this.threadPool = threadPool;
this.initialDelayMillis = initialDelay.getMillis();
if (initialDelayMillis < 1) {
throw new IllegalArgumentException("Initial delay was less than 1 millisecond: " + initialDelay);
}
this.timeoutMillis = Math.max(timeoutValue.getMillis(), 1);
this.startMillis = threadPool.relativeTimeInMillis();
this.finalListener = listener;
this.executor = executor;
}
public void run() {
final RetryingListener retryingListener = new RetryingListener(initialDelayMillis, null);
final Runnable runnable = createRunnable(retryingListener);
threadPool.executor(executor).execute(runnable);
}
public void cancel(Exception e) {
if (isDone.compareAndSet(false, true)) {
finalListener.onFailure(e);
}
}
private Runnable createRunnable(RetryingListener retryingListener) {
return new ActionRunnable<Response>(retryingListener) {
@Override
protected void doRun() {
tryAction(listener);
}
@Override
public void onRejection(Exception e) {
// TODO: The only implementations of this class use SAME which means the execution will not be
// rejected. Future implementations can adjust this functionality as needed.
onFailure(e);
}
};
}
public abstract void tryAction(ActionListener<Response> listener);
public abstract boolean shouldRetry(Exception e);
private class RetryingListener implements ActionListener<Response> {
private static final int MAX_EXCEPTIONS = 4;
private final long delayMillisBound;
private ArrayDeque<Exception> caughtExceptions;
private RetryingListener(long delayMillisBound, ArrayDeque<Exception> caughtExceptions) {
this.delayMillisBound = delayMillisBound;
this.caughtExceptions = caughtExceptions;
}
@Override
public void onResponse(Response response) {
if (isDone.compareAndSet(false, true)) {
finalListener.onResponse(response);
}
}
@Override
public void onFailure(Exception e) {
if (shouldRetry(e)) {
final long elapsedMillis = threadPool.relativeTimeInMillis() - startMillis;
if (elapsedMillis >= timeoutMillis) {
logger.debug(() -> new ParameterizedMessage("retryable action timed out after {}",
TimeValue.timeValueMillis(elapsedMillis)), e);
addException(e);
if (isDone.compareAndSet(false, true)) {
finalListener.onFailure(buildFinalException());
}
} else {
addException(e);
final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE);
final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions);
final Runnable runnable = createRunnable(retryingListener);
final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1;
if (isDone.get() == false) {
final TimeValue delay = TimeValue.timeValueMillis(delayMillis);
logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e);
threadPool.schedule(runnable, delay, executor);
}
}
} else {
addException(e);
if (isDone.compareAndSet(false,true)) {
finalListener.onFailure(buildFinalException());
}
}
}
private Exception buildFinalException() {
final Exception topLevel = caughtExceptions.removeFirst();
Exception suppressed;
while ((suppressed = caughtExceptions.pollFirst()) != null) {
topLevel.addSuppressed(suppressed);
}
return topLevel;
}
private void addException(Exception e) {
if (caughtExceptions != null) {
if (caughtExceptions.size() == MAX_EXCEPTIONS) {
caughtExceptions.removeLast();
}
} else {
caughtExceptions = new ArrayDeque<>(MAX_EXCEPTIONS);
}
caughtExceptions.addFirst(e);
}
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexEventListener;
@ -64,15 +65,17 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
private final TransportService transportService;
private final IndicesService indicesService;
private final RecoverySettings recoverySettings;
private final BigArrays bigArrays;
final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries();
@Inject
public PeerRecoverySourceService(TransportService transportService, IndicesService indicesService,
RecoverySettings recoverySettings) {
RecoverySettings recoverySettings, BigArrays bigArrays) {
this.transportService = transportService;
this.indicesService = indicesService;
this.recoverySettings = recoverySettings;
this.bigArrays = bigArrays;
transportService.registerRequestHandler(Actions.START_RECOVERY, ThreadPool.Names.GENERIC, StartRecoveryRequest::new,
new StartRecoveryTransportRequestHandler());
}
@ -222,7 +225,7 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) {
RecoverySourceHandler handler;
final RemoteRecoveryTargetHandler recoveryTarget =
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, bigArrays,
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks());

View File

@ -63,6 +63,11 @@ public class RecoverySettings {
Setting.positiveTimeSetting("indices.recovery.internal_action_timeout", TimeValue.timeValueMinutes(15),
Property.Dynamic, Property.NodeScope);
/** timeout value to use for the retrying of requests made as part of the recovery process */
public static final Setting<TimeValue> INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING =
Setting.positiveTimeSetting("indices.recovery.internal_action_retry_timeout", TimeValue.timeValueMinutes(1),
Property.Dynamic, Property.NodeScope);
/**
* timeout value to use for requests made as part of the recovery process that are expected to take long time.
* defaults to twice `indices.recovery.internal_action_timeout`.
@ -91,6 +96,7 @@ public class RecoverySettings {
private volatile TimeValue retryDelayNetwork;
private volatile TimeValue activityTimeout;
private volatile TimeValue internalActionTimeout;
private volatile TimeValue internalActionRetryTimeout;
private volatile TimeValue internalActionLongTimeout;
private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
@ -103,6 +109,7 @@ public class RecoverySettings {
this.retryDelayNetwork = INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.get(settings);
this.internalActionTimeout = INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING.get(settings);
this.internalActionRetryTimeout = INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING.get(settings);
this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings);
this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
@ -146,6 +153,10 @@ public class RecoverySettings {
return internalActionTimeout;
}
public TimeValue internalActionRetryTimeout() {
return internalActionRetryTimeout;
}
public TimeValue internalActionLongTimeout() {
return internalActionLongTimeout;
}

View File

@ -818,6 +818,7 @@ public class RecoverySourceHandler {
*/
public void cancel(String reason) {
cancellableThreads.cancel(reason);
recoveryTarget.cancel();
}
@Override

View File

@ -106,4 +106,5 @@ public interface RecoveryTargetHandler {
void writeFileChunk(StoreFileMetadata fileMetadata, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener);
default void cancel() {}
}

View File

@ -19,12 +19,24 @@
package org.elasticsearch.indices.recovery;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.RetryableAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
@ -33,22 +45,30 @@ import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
private static final Logger logger = LogManager.getLogger(RemoteRecoveryTargetHandler.class);
private final TransportService transportService;
private final ThreadPool threadPool;
private final long recoveryId;
private final ShardId shardId;
private final BigArrays bigArrays;
private final DiscoveryNode targetNode;
private final RecoverySettings recoverySettings;
private final Map<Object, RetryableAction<?>> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap();
private final TransportRequestOptions translogOpsRequestOptions;
private final TransportRequestOptions fileChunkRequestOptions;
@ -56,12 +76,15 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
private final AtomicLong bytesSinceLastPause = new AtomicLong();
private final Consumer<Long> onSourceThrottle;
private volatile boolean isCancelled = false;
public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService,
public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService, BigArrays bigArrays,
DiscoveryNode targetNode, RecoverySettings recoverySettings, Consumer<Long> onSourceThrottle) {
this.transportService = transportService;
this.threadPool = transportService.getThreadPool();
this.recoveryId = recoveryId;
this.shardId = shardId;
this.bigArrays = bigArrays;
this.targetNode = targetNode;
this.recoverySettings = recoverySettings;
this.onSourceThrottle = onSourceThrottle;
@ -73,25 +96,30 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())
.build();
}
@Override
public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
final String action = PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG;
final RecoveryPrepareForTranslogOperationsRequest request =
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps);
final TransportRequestOptions options =
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build();
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
executeRetryableAction(action, request, options, responseListener, reader);
}
@Override
public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, final ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint, trimAboveSeqNo),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
final String action = PeerRecoveryTargetService.Actions.FINALIZE;
final RecoveryFinalizeRecoveryRequest request =
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint, trimAboveSeqNo);
final TransportRequestOptions options =
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build();
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
executeRetryableAction(action, request, options, responseListener, reader);
}
@Override
@ -113,6 +141,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
final RetentionLeases retentionLeases,
final long mappingVersionOnPrimary,
final ActionListener<Long> listener) {
final String action = PeerRecoveryTargetService.Actions.TRANSLOG_OPS;
final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest(
recoveryId,
shardId,
@ -122,30 +151,35 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
maxSeqNoOfDeletesOrUpdatesOnPrimary,
retentionLeases,
mappingVersionOnPrimary);
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions,
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> r.localCheckpoint),
RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC));
final Writeable.Reader<RecoveryTranslogOperationsResponse> reader = RecoveryTranslogOperationsResponse::new;
final ActionListener<RecoveryTranslogOperationsResponse> responseListener = ActionListener.map(listener, r -> r.localCheckpoint);
executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader);
}
@Override
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes, int totalTranslogOps, ActionListener<Void> listener) {
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(recoveryId, shardId,
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILES_INFO, recoveryInfoFilesRequest,
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
final String action = PeerRecoveryTargetService.Actions.FILES_INFO;
RecoveryFilesInfoRequest request = new RecoveryFilesInfoRequest(recoveryId, shardId, phase1FileNames, phase1FileSizes,
phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
final TransportRequestOptions options =
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build();
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
executeRetryableAction(action, request, options, responseListener, reader);
}
@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetadata,
ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetadata, totalTranslogOps, globalCheckpoint),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
final String action = PeerRecoveryTargetService.Actions.CLEAN_FILES;
final RecoveryCleanFilesRequest request =
new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetadata, totalTranslogOps, globalCheckpoint);
final TransportRequestOptions options =
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build();
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
executeRetryableAction(action, request, options, responseListener, reader);
}
@Override
@ -173,15 +207,81 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
throttleTimeInNanos = 0;
}
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILE_CHUNK,
new RecoveryFileChunkRequest(recoveryId, shardId, fileMetadata, position, content, lastChunk,
totalTranslogOps,
/* we send estimateTotalOperations with every request since we collect stats on the target and that way we can
* see how many translog ops we accumulate while copying files across the network. A future optimization
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
*/
throttleTimeInNanos), fileChunkRequestOptions, new ActionListenerResponseHandler<>(
ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
final String action = PeerRecoveryTargetService.Actions.FILE_CHUNK;
final ReleasableBytesStreamOutput output = new ReleasableBytesStreamOutput(content.length(), bigArrays);
boolean actionStarted = false;
try {
content.writeTo(output);
/* we send estimateTotalOperations with every request since we collect stats on the target and that way we can
* see how many translog ops we accumulate while copying files across the network. A future optimization
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
*/
final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest(recoveryId, shardId, fileMetadata,
position, output.bytes(), lastChunk, totalTranslogOps, throttleTimeInNanos);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
final ActionListener<TransportResponse.Empty> releaseListener = ActionListener.runBefore(responseListener, output::close);
executeRetryableAction(action, request, fileChunkRequestOptions, releaseListener, reader);
actionStarted = true;
} catch (IOException e) {
// Since the content data is buffer in memory, we should never get an exception.
throw new AssertionError(e);
} finally {
if (actionStarted == false) {
output.close();
}
}
}
@Override
public void cancel() {
isCancelled = true;
if (onGoingRetryableActions.isEmpty()) {
return;
}
final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("recovery was cancelled");
// Dispatch to generic as cancellation calls can come on the cluster state applier thread
threadPool.generic().execute(() -> {
for (RetryableAction<?> action : onGoingRetryableActions.values()) {
action.cancel(exception);
}
onGoingRetryableActions.clear();
});
}
private <T extends TransportResponse> void executeRetryableAction(String action, TransportRequest request,
TransportRequestOptions options, ActionListener<T> actionListener,
Writeable.Reader<T> reader) {
final Object key = new Object();
final ActionListener<T> removeListener = ActionListener.runBefore(actionListener, () -> onGoingRetryableActions.remove(key));
final TimeValue initialDelay = TimeValue.timeValueMillis(200);
final TimeValue timeout = recoverySettings.internalActionRetryTimeout();
final RetryableAction<T> retryableAction = new RetryableAction<T>(logger, threadPool, initialDelay, timeout, removeListener) {
@Override
public void tryAction(ActionListener<T> listener) {
transportService.sendRequest(targetNode, action, request, options,
new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC));
}
@Override
public boolean shouldRetry(Exception e) {
return retryableException(e);
}
};
onGoingRetryableActions.put(key, retryableAction);
retryableAction.run();
if (isCancelled) {
retryableAction.cancel(new CancellableThreads.ExecutionCancelledException("recovery was cancelled"));
}
}
private static boolean retryableException(Exception e) {
if (e instanceof RemoteTransportException) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
return cause instanceof CircuitBreakingException ||
cause instanceof EsRejectedExecutionException;
}
return false;
}
}

View File

@ -604,7 +604,7 @@ public class Node implements Closeable {
RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService,
indicesService, recoverySettings));
indicesService, recoverySettings, bigArrays));
b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(threadPool,
transportService, recoverySettings, clusterService));
}

View File

@ -0,0 +1,207 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class RetryableActionTests extends ESTestCase {
private DeterministicTaskQueue taskQueue;
@Before
public void setUp() throws Exception {
super.setUp();
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
taskQueue = new DeterministicTaskQueue(settings, random());
}
public void testRetryableActionNoRetries() {
final AtomicInteger executedCount = new AtomicInteger();
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
final RetryableAction<Boolean> retryableAction = new RetryableAction<Boolean>(logger, taskQueue.getThreadPool(),
TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(30), future) {
@Override
public void tryAction(ActionListener<Boolean> listener) {
executedCount.getAndIncrement();
listener.onResponse(true);
}
@Override
public boolean shouldRetry(Exception e) {
return true;
}
};
retryableAction.run();
taskQueue.runAllRunnableTasks();
assertEquals(1, executedCount.get());
assertTrue(future.actionGet());
}
public void testRetryableActionWillRetry() {
int expectedRetryCount = randomIntBetween(1, 8);
final AtomicInteger remainingFailedCount = new AtomicInteger(expectedRetryCount);
final AtomicInteger retryCount = new AtomicInteger();
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
final RetryableAction<Boolean> retryableAction = new RetryableAction<Boolean>(logger, taskQueue.getThreadPool(),
TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(30), future) {
@Override
public void tryAction(ActionListener<Boolean> listener) {
if (remainingFailedCount.getAndDecrement() == 0) {
listener.onResponse(true);
} else {
if (randomBoolean()) {
listener.onFailure(new EsRejectedExecutionException());
} else {
throw new EsRejectedExecutionException();
}
}
}
@Override
public boolean shouldRetry(Exception e) {
retryCount.getAndIncrement();
return e instanceof EsRejectedExecutionException;
}
};
retryableAction.run();
taskQueue.runAllRunnableTasks();
long previousDeferredTime = 0;
for (int i = 0; i < expectedRetryCount; ++i) {
assertTrue(taskQueue.hasDeferredTasks());
final long deferredExecutionTime = taskQueue.getLatestDeferredExecutionTime();
final long millisBound = 10 << i;
assertThat(deferredExecutionTime, lessThanOrEqualTo(millisBound + previousDeferredTime));
previousDeferredTime = deferredExecutionTime;
taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();
}
assertEquals(expectedRetryCount, retryCount.get());
assertTrue(future.actionGet());
}
public void testRetryableActionTimeout() {
final AtomicInteger retryCount = new AtomicInteger();
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
final RetryableAction<Boolean> retryableAction = new RetryableAction<Boolean>(logger, taskQueue.getThreadPool(),
TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(1), future) {
@Override
public void tryAction(ActionListener<Boolean> listener) {
if (randomBoolean()) {
listener.onFailure(new EsRejectedExecutionException());
} else {
throw new EsRejectedExecutionException();
}
}
@Override
public boolean shouldRetry(Exception e) {
retryCount.getAndIncrement();
return e instanceof EsRejectedExecutionException;
}
};
retryableAction.run();
taskQueue.runAllRunnableTasks();
long previousDeferredTime = 0;
while (previousDeferredTime < 1000) {
assertTrue(taskQueue.hasDeferredTasks());
previousDeferredTime = taskQueue.getLatestDeferredExecutionTime();
taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();
}
assertFalse(taskQueue.hasDeferredTasks());
assertFalse(taskQueue.hasRunnableTasks());
expectThrows(EsRejectedExecutionException.class, future::actionGet);
}
public void testFailedBecauseNotRetryable() {
final AtomicInteger executedCount = new AtomicInteger();
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
final RetryableAction<Boolean> retryableAction = new RetryableAction<Boolean>(logger, taskQueue.getThreadPool(),
TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(30), future) {
@Override
public void tryAction(ActionListener<Boolean> listener) {
executedCount.getAndIncrement();
throw new IllegalStateException();
}
@Override
public boolean shouldRetry(Exception e) {
return e instanceof EsRejectedExecutionException;
}
};
retryableAction.run();
taskQueue.runAllRunnableTasks();
assertEquals(1, executedCount.get());
expectThrows(IllegalStateException.class, future::actionGet);
}
public void testRetryableActionCancelled() {
final AtomicInteger executedCount = new AtomicInteger();
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
final RetryableAction<Boolean> retryableAction = new RetryableAction<Boolean>(logger, taskQueue.getThreadPool(),
TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(30), future) {
@Override
public void tryAction(ActionListener<Boolean> listener) {
if (executedCount.incrementAndGet() == 1) {
throw new EsRejectedExecutionException();
} else {
listener.onResponse(true);
}
}
@Override
public boolean shouldRetry(Exception e) {
return e instanceof EsRejectedExecutionException;
}
};
retryableAction.run();
taskQueue.runAllRunnableTasks();
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.advanceTime();
retryableAction.cancel(new ElasticsearchException("Cancelled"));
taskQueue.runAllRunnableTasks();
assertEquals(2, executedCount.get());
expectThrows(ElasticsearchException.class, future::actionGet);
}
}

View File

@ -100,8 +100,8 @@ public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {
countDownLatch.await();
logger.info("waiting for cluster to reform");
masterTransportService.clearRule(localTransportService);
nonMasterTransportService.clearRule(localTransportService);
masterTransportService.clearOutboundRules(localTransportService);
nonMasterTransportService.clearOutboundRules(localTransportService);
ensureStableCluster(2);

View File

@ -138,7 +138,7 @@ public class GlobalCheckpointSyncIT extends ESIntegTestCase {
(MockTransportService) internalCluster().getInstance(TransportService.class, node.getName());
final MockTransportService receiverTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, other.getName());
senderTransportService.clearRule(receiverTransportService);
senderTransportService.clearOutboundRules(receiverTransportService);
}
}
});

View File

@ -60,10 +60,13 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
@ -94,6 +97,7 @@ import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@ -107,7 +111,9 @@ import org.elasticsearch.test.transport.StubbableTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
@ -717,6 +723,143 @@ public class IndexRecoveryIT extends ESIntegTestCase {
assertThat(indexState.recoveredBytesPercent(), lessThanOrEqualTo(100.0f));
}
public void testTransientErrorsDuringRecoveryAreRetried() throws Exception {
final String indexName = "test";
final Settings nodeSettings = Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "360s")
.put(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING.getKey(), "10s")
.build();
// start a master node
internalCluster().startNode(nodeSettings);
final String blueNodeName = internalCluster()
.startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build());
final String redNodeName = internalCluster()
.startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
assertThat(response.isTimedOut(), is(false));
client().admin().indices().prepareCreate(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "blue")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
).get();
List<IndexRequestBuilder> requests = new ArrayList<>();
int numDocs = scaledRandomIntBetween(100, 8000);
// Index 3/4 of the documents and flush. And then index the rest. This attempts to ensure that there
// is a mix of file chunks and translog ops
int threeFourths = (int) (numDocs * 0.75);
for (int i = 0; i < threeFourths; i++) {
requests.add(client().prepareIndex(indexName, "type").setSource("{}", XContentType.JSON));
}
indexRandom(true, requests);
flush(indexName);
requests.clear();
for (int i = threeFourths; i < numDocs; i++) {
requests.add(client().prepareIndex(indexName, "type").setSource("{}", XContentType.JSON));
}
indexRandom(true, requests);
ensureSearchable(indexName);
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get();
final String blueNodeId = internalCluster().getInstance(ClusterService.class, blueNodeName).localNode().getId();
assertFalse(stateResponse.getState().getRoutingNodes().node(blueNodeId).isEmpty());
SearchResponse searchResponse = client().prepareSearch(indexName).get();
assertHitCount(searchResponse, numDocs);
String[] recoveryActions = new String[]{
PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
PeerRecoveryTargetService.Actions.TRANSLOG_OPS,
PeerRecoveryTargetService.Actions.FILES_INFO,
PeerRecoveryTargetService.Actions.FILE_CHUNK,
PeerRecoveryTargetService.Actions.CLEAN_FILES,
PeerRecoveryTargetService.Actions.FINALIZE
};
final String recoveryActionToBlock = randomFrom(recoveryActions);
logger.info("--> will break connection between blue & red on [{}]", recoveryActionToBlock);
MockTransportService blueTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName);
MockTransportService redTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName);
final SingleStartEnforcer validator = new SingleStartEnforcer();
blueTransportService.addSendBehavior(redTransportService, (connection, requestId, action, request, options) -> {
validator.accept(action);
connection.sendRequest(requestId, action, request, options);
});
redTransportService.addSendBehavior(blueTransportService, (connection, requestId, action, request, options) -> {
validator.accept(action);
connection.sendRequest(requestId, action, request, options);
});
blueTransportService.addRequestHandlingBehavior(recoveryActionToBlock, new TransientReceiveRejected(recoveryActionToBlock));
redTransportService.addRequestHandlingBehavior(recoveryActionToBlock, new TransientReceiveRejected(recoveryActionToBlock));
try {
logger.info("--> starting recovery from blue to red");
client().admin().indices().prepareUpdateSettings(indexName).setSettings(
Settings.builder()
.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "red,blue")
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
).get();
ensureGreen();
searchResponse = client(redNodeName).prepareSearch(indexName).setPreference("_local").get();
assertHitCount(searchResponse, numDocs);
} finally {
blueTransportService.clearAllRules();
redTransportService.clearAllRules();
}
}
private class TransientReceiveRejected implements StubbableTransport.RequestHandlingBehavior<TransportRequest> {
private final String actionName;
private final AtomicInteger blocksRemaining;
private TransientReceiveRejected(String actionName) {
this.actionName = actionName;
this.blocksRemaining = new AtomicInteger(randomIntBetween(1, 3));
}
@Override
public void messageReceived(TransportRequestHandler<TransportRequest> handler, TransportRequest request, TransportChannel channel,
Task task) throws Exception {
if (blocksRemaining.updateAndGet(i -> i == 0 ? 0 : i - 1) != 0) {
logger.info("--> preventing {} response by throwing exception", actionName);
if (randomBoolean()) {
throw new EsRejectedExecutionException();
} else {
throw new CircuitBreakingException("Broken", CircuitBreaker.Durability.PERMANENT);
}
}
handler.messageReceived(request, channel, task);
}
}
private static class SingleStartEnforcer implements Consumer<String> {
private final AtomicBoolean recoveryStarted = new AtomicBoolean(false);
@Override
public void accept(String action) {
// The cluster state applier will immediately attempt to retry the recovery on a cluster state
// update. We want to assert that the first and only recovery attempt succeeds
if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) {
if (recoveryStarted.compareAndSet(false, true) == false) {
throw new IllegalStateException("Recovery cannot be started twice");
}
}
}
}
public void testDisconnectsWhileRecovering() throws Exception {
final String indexName = "test";
final Settings nodeSettings = Settings.builder()
@ -819,7 +962,6 @@ public class IndexRecoveryIT extends ESIntegTestCase {
ensureGreen();
searchResponse = client(redNodeName).prepareSearch(indexName).setPreference("_local").get();
assertHitCount(searchResponse, numDocs);
}
private class RecoveryActionBlocker implements StubbableTransport.SendRequestBehavior {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
@ -39,7 +40,8 @@ public class PeerRecoverySourceServiceTests extends IndexShardTestCase {
IndexShard primary = newStartedShard(true);
PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService(
mock(TransportService.class), mock(IndicesService.class),
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
BigArrays.NON_RECYCLING_INSTANCE);
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),
SequenceNumbers.UNASSIGNED_SEQ_NO);

View File

@ -152,8 +152,6 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
@ -1370,7 +1368,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
repositoriesService,
mock(SearchService.class),
new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver),
new PeerRecoverySourceService(transportService, indicesService, recoverySettings),
new PeerRecoverySourceService(transportService, indicesService, recoverySettings, bigArrays),
snapshotShardsService,
new PrimaryReplicaSyncer(
transportService,

View File

@ -405,7 +405,7 @@ public class NetworkDisruption implements ServiceDisruptionScheme {
* @param targetTransportService target transport service to which requests are sent
*/
public void removeDisruption(MockTransportService sourceTransportService, MockTransportService targetTransportService) {
sourceTransportService.clearRule(targetTransportService);
sourceTransportService.clearOutboundRules(targetTransportService);
}
/**

View File

@ -196,19 +196,26 @@ public final class MockTransportService extends TransportService {
}
/**
* Clears the rule associated with the provided delegate service.
* Clears all the inbound rules.
*/
public void clearRule(TransportService transportService) {
public void clearInboundRules() {
transport().clearInboundBehaviors();
}
/**
* Clears the outbound rules associated with the provided delegate service.
*/
public void clearOutboundRules(TransportService transportService) {
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
clearRule(transportAddress);
clearOutboundRules(transportAddress);
}
}
/**
* Clears the rule associated with the provided delegate address.
* Clears the outbound rules associated with the provided delegate address.
*/
public void clearRule(TransportAddress transportAddress) {
transport().clearBehavior(transportAddress);
public void clearOutboundRules(TransportAddress transportAddress) {
transport().clearOutboundBehaviors(transportAddress);
connectionManager().clearBehavior(transportAddress);
}

View File

@ -92,17 +92,25 @@ public class StubbableTransport implements Transport {
}
void clearBehaviors() {
this.defaultSendRequest = null;
sendBehaviors.clear();
this.defaultConnectBehavior = null;
connectBehaviors.clear();
clearOutboundBehaviors();
clearInboundBehaviors();
}
void clearInboundBehaviors() {
for (Map.Entry<String, RequestHandlerRegistry<?>> entry : replacedRequestRegistries.entrySet()) {
getRequestHandlers().forceRegister(entry.getValue());
}
replacedRequestRegistries.clear();
}
void clearBehavior(TransportAddress transportAddress) {
void clearOutboundBehaviors() {
this.defaultSendRequest = null;
sendBehaviors.clear();
this.defaultConnectBehavior = null;
connectBehaviors.clear();
}
void clearOutboundBehaviors(TransportAddress transportAddress) {
SendRequestBehavior behavior = sendBehaviors.remove(transportAddress);
if (behavior != null) {
behavior.clearCallback();