From cd228095dfeb6e1e93d4b5d24b161f611a6ecda3 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 28 Apr 2020 13:52:49 -0600 Subject: [PATCH] Retry failed peer recovery due to transient errors (#55883) Currently a failed peer recovery action will fail an recovery. This includes when the recovery fails due to potentially short lived transient issues such as rejected exceptions or circuit breaking errors. This commit adds the concept of a retryable action. A retryable action will be retryed in face of certain errors. The action will be retried after an exponentially increasing backoff period. After defined time, the action will timeout. This commit only implements retries for responses that indicate the target node has NOT executed the action. --- .../action/support/RetryableAction.java | 174 +++++++++++++++ .../recovery/PeerRecoverySourceService.java | 7 +- .../indices/recovery/RecoverySettings.java | 11 + .../recovery/RecoverySourceHandler.java | 1 + .../recovery/RecoveryTargetHandler.java | 1 + .../recovery/RemoteRecoveryTargetHandler.java | 170 +++++++++++--- .../java/org/elasticsearch/node/Node.java | 2 +- .../action/support/RetryableActionTests.java | 207 ++++++++++++++++++ .../discovery/DiscoveryDisruptionIT.java | 4 +- .../index/seqno/GlobalCheckpointSyncIT.java | 2 +- .../indices/recovery/IndexRecoveryIT.java | 144 +++++++++++- .../PeerRecoverySourceServiceTests.java | 4 +- .../snapshots/SnapshotResiliencyTests.java | 4 +- .../test/disruption/NetworkDisruption.java | 2 +- .../test/transport/MockTransportService.java | 19 +- .../test/transport/StubbableTransport.java | 18 +- 16 files changed, 712 insertions(+), 58 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/support/RetryableAction.java create mode 100644 server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java diff --git a/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java b/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java new file mode 100644 index 00000000000..d1101a60702 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java @@ -0,0 +1,174 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayDeque; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A action that will be retried on failure if {@link RetryableAction#shouldRetry(Exception)} returns true. + * The executor the action will be executed on can be defined in the constructor. Otherwise, SAME is the + * default. The action will be retried with exponentially increasing delay periods until the timeout period + * has been reached. + */ +public abstract class RetryableAction { + + private final Logger logger; + + private final AtomicBoolean isDone = new AtomicBoolean(false); + private final ThreadPool threadPool; + private final long initialDelayMillis; + private final long timeoutMillis; + private final long startMillis; + private final ActionListener finalListener; + private final String executor; + + public RetryableAction(Logger logger, ThreadPool threadPool, TimeValue initialDelay, TimeValue timeoutValue, + ActionListener listener) { + this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME); + } + + public RetryableAction(Logger logger, ThreadPool threadPool, TimeValue initialDelay, TimeValue timeoutValue, + ActionListener listener, String executor) { + this.logger = logger; + this.threadPool = threadPool; + this.initialDelayMillis = initialDelay.getMillis(); + if (initialDelayMillis < 1) { + throw new IllegalArgumentException("Initial delay was less than 1 millisecond: " + initialDelay); + } + this.timeoutMillis = Math.max(timeoutValue.getMillis(), 1); + this.startMillis = threadPool.relativeTimeInMillis(); + this.finalListener = listener; + this.executor = executor; + } + + public void run() { + final RetryingListener retryingListener = new RetryingListener(initialDelayMillis, null); + final Runnable runnable = createRunnable(retryingListener); + threadPool.executor(executor).execute(runnable); + } + + public void cancel(Exception e) { + if (isDone.compareAndSet(false, true)) { + finalListener.onFailure(e); + } + } + + private Runnable createRunnable(RetryingListener retryingListener) { + return new ActionRunnable(retryingListener) { + + @Override + protected void doRun() { + tryAction(listener); + } + + @Override + public void onRejection(Exception e) { + // TODO: The only implementations of this class use SAME which means the execution will not be + // rejected. Future implementations can adjust this functionality as needed. + onFailure(e); + } + }; + } + + public abstract void tryAction(ActionListener listener); + + public abstract boolean shouldRetry(Exception e); + + private class RetryingListener implements ActionListener { + + private static final int MAX_EXCEPTIONS = 4; + + private final long delayMillisBound; + private ArrayDeque caughtExceptions; + + private RetryingListener(long delayMillisBound, ArrayDeque caughtExceptions) { + this.delayMillisBound = delayMillisBound; + this.caughtExceptions = caughtExceptions; + } + + @Override + public void onResponse(Response response) { + if (isDone.compareAndSet(false, true)) { + finalListener.onResponse(response); + } + } + + @Override + public void onFailure(Exception e) { + if (shouldRetry(e)) { + final long elapsedMillis = threadPool.relativeTimeInMillis() - startMillis; + if (elapsedMillis >= timeoutMillis) { + logger.debug(() -> new ParameterizedMessage("retryable action timed out after {}", + TimeValue.timeValueMillis(elapsedMillis)), e); + addException(e); + if (isDone.compareAndSet(false, true)) { + finalListener.onFailure(buildFinalException()); + } + } else { + addException(e); + + final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE); + final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions); + final Runnable runnable = createRunnable(retryingListener); + final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1; + if (isDone.get() == false) { + final TimeValue delay = TimeValue.timeValueMillis(delayMillis); + logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e); + threadPool.schedule(runnable, delay, executor); + } + } + } else { + addException(e); + if (isDone.compareAndSet(false,true)) { + finalListener.onFailure(buildFinalException()); + } + } + } + + private Exception buildFinalException() { + final Exception topLevel = caughtExceptions.removeFirst(); + Exception suppressed; + while ((suppressed = caughtExceptions.pollFirst()) != null) { + topLevel.addSuppressed(suppressed); + } + return topLevel; + } + + private void addException(Exception e) { + if (caughtExceptions != null) { + if (caughtExceptions.size() == MAX_EXCEPTIONS) { + caughtExceptions.removeLast(); + } + } else { + caughtExceptions = new ArrayDeque<>(MAX_EXCEPTIONS); + } + caughtExceptions.addFirst(e); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index 644a8e2eb5c..736a6e4c110 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexEventListener; @@ -64,15 +65,17 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem private final TransportService transportService; private final IndicesService indicesService; private final RecoverySettings recoverySettings; + private final BigArrays bigArrays; final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries(); @Inject public PeerRecoverySourceService(TransportService transportService, IndicesService indicesService, - RecoverySettings recoverySettings) { + RecoverySettings recoverySettings, BigArrays bigArrays) { this.transportService = transportService; this.indicesService = indicesService; this.recoverySettings = recoverySettings; + this.bigArrays = bigArrays; transportService.registerRequestHandler(Actions.START_RECOVERY, ThreadPool.Names.GENERIC, StartRecoveryRequest::new, new StartRecoveryTransportRequestHandler()); } @@ -222,7 +225,7 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) { RecoverySourceHandler handler; final RemoteRecoveryTargetHandler recoveryTarget = - new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, + new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, bigArrays, request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)); handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks()); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 469529bb09f..9abd5dcf08f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -63,6 +63,11 @@ public class RecoverySettings { Setting.positiveTimeSetting("indices.recovery.internal_action_timeout", TimeValue.timeValueMinutes(15), Property.Dynamic, Property.NodeScope); + /** timeout value to use for the retrying of requests made as part of the recovery process */ + public static final Setting INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING = + Setting.positiveTimeSetting("indices.recovery.internal_action_retry_timeout", TimeValue.timeValueMinutes(1), + Property.Dynamic, Property.NodeScope); + /** * timeout value to use for requests made as part of the recovery process that are expected to take long time. * defaults to twice `indices.recovery.internal_action_timeout`. @@ -91,6 +96,7 @@ public class RecoverySettings { private volatile TimeValue retryDelayNetwork; private volatile TimeValue activityTimeout; private volatile TimeValue internalActionTimeout; + private volatile TimeValue internalActionRetryTimeout; private volatile TimeValue internalActionLongTimeout; private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; @@ -103,6 +109,7 @@ public class RecoverySettings { this.retryDelayNetwork = INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.get(settings); this.internalActionTimeout = INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING.get(settings); + this.internalActionRetryTimeout = INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING.get(settings); this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings); this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings); @@ -146,6 +153,10 @@ public class RecoverySettings { return internalActionTimeout; } + public TimeValue internalActionRetryTimeout() { + return internalActionRetryTimeout; + } + public TimeValue internalActionLongTimeout() { return internalActionLongTimeout; } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index e3a3b70357a..d5141e65ce6 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -818,6 +818,7 @@ public class RecoverySourceHandler { */ public void cancel(String reason) { cancellableThreads.cancel(reason); + recoveryTarget.cancel(); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index c6d8ccbda0b..709f1c5b1cd 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -106,4 +106,5 @@ public interface RecoveryTargetHandler { void writeFileChunk(StoreFileMetadata fileMetadata, long position, BytesReference content, boolean lastChunk, int totalTranslogOps, ActionListener listener); + default void cancel() {} } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 9cf4a395fa7..11ecb782079 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -19,12 +19,24 @@ package org.elasticsearch.indices.recovery; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.support.RetryableAction; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.ShardId; @@ -33,22 +45,30 @@ import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { + private static final Logger logger = LogManager.getLogger(RemoteRecoveryTargetHandler.class); + private final TransportService transportService; + private final ThreadPool threadPool; private final long recoveryId; private final ShardId shardId; + private final BigArrays bigArrays; private final DiscoveryNode targetNode; private final RecoverySettings recoverySettings; + private final Map> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap(); private final TransportRequestOptions translogOpsRequestOptions; private final TransportRequestOptions fileChunkRequestOptions; @@ -56,12 +76,15 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { private final AtomicLong bytesSinceLastPause = new AtomicLong(); private final Consumer onSourceThrottle; + private volatile boolean isCancelled = false; - public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService, + public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService, BigArrays bigArrays, DiscoveryNode targetNode, RecoverySettings recoverySettings, Consumer onSourceThrottle) { this.transportService = transportService; + this.threadPool = transportService.getThreadPool(); this.recoveryId = recoveryId; this.shardId = shardId; + this.bigArrays = bigArrays; this.targetNode = targetNode; this.recoverySettings = recoverySettings; this.onSourceThrottle = onSourceThrottle; @@ -73,25 +96,30 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionTimeout()) .build(); - } @Override public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { - transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, - new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps), - TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), - new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null), - in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); + final String action = PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG; + final RecoveryPrepareForTranslogOperationsRequest request = + new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps); + final TransportRequestOptions options = + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(); + final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + final ActionListener responseListener = ActionListener.map(listener, r -> null); + executeRetryableAction(action, request, options, responseListener, reader); } @Override public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, final ActionListener listener) { - transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE, - new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint, trimAboveSeqNo), - TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), - new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null), - in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); + final String action = PeerRecoveryTargetService.Actions.FINALIZE; + final RecoveryFinalizeRecoveryRequest request = + new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint, trimAboveSeqNo); + final TransportRequestOptions options = + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(); + final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + final ActionListener responseListener = ActionListener.map(listener, r -> null); + executeRetryableAction(action, request, options, responseListener, reader); } @Override @@ -113,6 +141,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { final RetentionLeases retentionLeases, final long mappingVersionOnPrimary, final ActionListener listener) { + final String action = PeerRecoveryTargetService.Actions.TRANSLOG_OPS; final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest( recoveryId, shardId, @@ -122,30 +151,35 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { maxSeqNoOfDeletesOrUpdatesOnPrimary, retentionLeases, mappingVersionOnPrimary); - transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions, - new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> r.localCheckpoint), - RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC)); + final Writeable.Reader reader = RecoveryTranslogOperationsResponse::new; + final ActionListener responseListener = ActionListener.map(listener, r -> r.localCheckpoint); + executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader); } @Override public void receiveFileInfo(List phase1FileNames, List phase1FileSizes, List phase1ExistingFileNames, List phase1ExistingFileSizes, int totalTranslogOps, ActionListener listener) { - RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(recoveryId, shardId, - phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps); - transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILES_INFO, recoveryInfoFilesRequest, - TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), - new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null), - in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); + final String action = PeerRecoveryTargetService.Actions.FILES_INFO; + RecoveryFilesInfoRequest request = new RecoveryFilesInfoRequest(recoveryId, shardId, phase1FileNames, phase1FileSizes, + phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps); + final TransportRequestOptions options = + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(); + final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + final ActionListener responseListener = ActionListener.map(listener, r -> null); + executeRetryableAction(action, request, options, responseListener, reader); } @Override public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetadata, ActionListener listener) { - transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.CLEAN_FILES, - new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetadata, totalTranslogOps, globalCheckpoint), - TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), - new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null), - in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); + final String action = PeerRecoveryTargetService.Actions.CLEAN_FILES; + final RecoveryCleanFilesRequest request = + new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetadata, totalTranslogOps, globalCheckpoint); + final TransportRequestOptions options = + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(); + final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + final ActionListener responseListener = ActionListener.map(listener, r -> null); + executeRetryableAction(action, request, options, responseListener, reader); } @Override @@ -173,15 +207,81 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { throttleTimeInNanos = 0; } - transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILE_CHUNK, - new RecoveryFileChunkRequest(recoveryId, shardId, fileMetadata, position, content, lastChunk, - totalTranslogOps, - /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can - * see how many translog ops we accumulate while copying files across the network. A future optimization - * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. - */ - throttleTimeInNanos), fileChunkRequestOptions, new ActionListenerResponseHandler<>( - ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); + final String action = PeerRecoveryTargetService.Actions.FILE_CHUNK; + final ReleasableBytesStreamOutput output = new ReleasableBytesStreamOutput(content.length(), bigArrays); + boolean actionStarted = false; + try { + content.writeTo(output); + /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can + * see how many translog ops we accumulate while copying files across the network. A future optimization + * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. + */ + final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest(recoveryId, shardId, fileMetadata, + position, output.bytes(), lastChunk, totalTranslogOps, throttleTimeInNanos); + final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + final ActionListener responseListener = ActionListener.map(listener, r -> null); + final ActionListener releaseListener = ActionListener.runBefore(responseListener, output::close); + executeRetryableAction(action, request, fileChunkRequestOptions, releaseListener, reader); + actionStarted = true; + } catch (IOException e) { + // Since the content data is buffer in memory, we should never get an exception. + throw new AssertionError(e); + } finally { + if (actionStarted == false) { + output.close(); + } + } } + @Override + public void cancel() { + isCancelled = true; + if (onGoingRetryableActions.isEmpty()) { + return; + } + final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("recovery was cancelled"); + // Dispatch to generic as cancellation calls can come on the cluster state applier thread + threadPool.generic().execute(() -> { + for (RetryableAction action : onGoingRetryableActions.values()) { + action.cancel(exception); + } + onGoingRetryableActions.clear(); + }); + } + + private void executeRetryableAction(String action, TransportRequest request, + TransportRequestOptions options, ActionListener actionListener, + Writeable.Reader reader) { + final Object key = new Object(); + final ActionListener removeListener = ActionListener.runBefore(actionListener, () -> onGoingRetryableActions.remove(key)); + final TimeValue initialDelay = TimeValue.timeValueMillis(200); + final TimeValue timeout = recoverySettings.internalActionRetryTimeout(); + final RetryableAction retryableAction = new RetryableAction(logger, threadPool, initialDelay, timeout, removeListener) { + + @Override + public void tryAction(ActionListener listener) { + transportService.sendRequest(targetNode, action, request, options, + new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC)); + } + + @Override + public boolean shouldRetry(Exception e) { + return retryableException(e); + } + }; + onGoingRetryableActions.put(key, retryableAction); + retryableAction.run(); + if (isCancelled) { + retryableAction.cancel(new CancellableThreads.ExecutionCancelledException("recovery was cancelled")); + } + } + + private static boolean retryableException(Exception e) { + if (e instanceof RemoteTransportException) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + return cause instanceof CircuitBreakingException || + cause instanceof EsRejectedExecutionException; + } + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 1fbfebc8916..c858872e91d 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -604,7 +604,7 @@ public class Node implements Closeable { RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings); b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService, - indicesService, recoverySettings)); + indicesService, recoverySettings, bigArrays)); b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); } diff --git a/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java b/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java new file mode 100644 index 00000000000..34b57e94310 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java @@ -0,0 +1,207 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class RetryableActionTests extends ESTestCase { + + private DeterministicTaskQueue taskQueue; + + @Before + public void setUp() throws Exception { + super.setUp(); + Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(); + taskQueue = new DeterministicTaskQueue(settings, random()); + } + + public void testRetryableActionNoRetries() { + final AtomicInteger executedCount = new AtomicInteger(); + final PlainActionFuture future = PlainActionFuture.newFuture(); + final RetryableAction retryableAction = new RetryableAction(logger, taskQueue.getThreadPool(), + TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(30), future) { + + @Override + public void tryAction(ActionListener listener) { + executedCount.getAndIncrement(); + listener.onResponse(true); + } + + @Override + public boolean shouldRetry(Exception e) { + return true; + } + }; + retryableAction.run(); + taskQueue.runAllRunnableTasks(); + + assertEquals(1, executedCount.get()); + assertTrue(future.actionGet()); + } + + public void testRetryableActionWillRetry() { + int expectedRetryCount = randomIntBetween(1, 8); + final AtomicInteger remainingFailedCount = new AtomicInteger(expectedRetryCount); + final AtomicInteger retryCount = new AtomicInteger(); + final PlainActionFuture future = PlainActionFuture.newFuture(); + final RetryableAction retryableAction = new RetryableAction(logger, taskQueue.getThreadPool(), + TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(30), future) { + + @Override + public void tryAction(ActionListener listener) { + if (remainingFailedCount.getAndDecrement() == 0) { + listener.onResponse(true); + } else { + if (randomBoolean()) { + listener.onFailure(new EsRejectedExecutionException()); + } else { + throw new EsRejectedExecutionException(); + } + } + } + + @Override + public boolean shouldRetry(Exception e) { + retryCount.getAndIncrement(); + return e instanceof EsRejectedExecutionException; + } + }; + retryableAction.run(); + taskQueue.runAllRunnableTasks(); + long previousDeferredTime = 0; + for (int i = 0; i < expectedRetryCount; ++i) { + assertTrue(taskQueue.hasDeferredTasks()); + final long deferredExecutionTime = taskQueue.getLatestDeferredExecutionTime(); + final long millisBound = 10 << i; + assertThat(deferredExecutionTime, lessThanOrEqualTo(millisBound + previousDeferredTime)); + previousDeferredTime = deferredExecutionTime; + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + } + + assertEquals(expectedRetryCount, retryCount.get()); + assertTrue(future.actionGet()); + } + + public void testRetryableActionTimeout() { + final AtomicInteger retryCount = new AtomicInteger(); + final PlainActionFuture future = PlainActionFuture.newFuture(); + final RetryableAction retryableAction = new RetryableAction(logger, taskQueue.getThreadPool(), + TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(1), future) { + + @Override + public void tryAction(ActionListener listener) { + if (randomBoolean()) { + listener.onFailure(new EsRejectedExecutionException()); + } else { + throw new EsRejectedExecutionException(); + } + } + + @Override + public boolean shouldRetry(Exception e) { + retryCount.getAndIncrement(); + return e instanceof EsRejectedExecutionException; + } + }; + retryableAction.run(); + taskQueue.runAllRunnableTasks(); + long previousDeferredTime = 0; + while (previousDeferredTime < 1000) { + assertTrue(taskQueue.hasDeferredTasks()); + previousDeferredTime = taskQueue.getLatestDeferredExecutionTime(); + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + } + + assertFalse(taskQueue.hasDeferredTasks()); + assertFalse(taskQueue.hasRunnableTasks()); + + expectThrows(EsRejectedExecutionException.class, future::actionGet); + } + + public void testFailedBecauseNotRetryable() { + final AtomicInteger executedCount = new AtomicInteger(); + final PlainActionFuture future = PlainActionFuture.newFuture(); + final RetryableAction retryableAction = new RetryableAction(logger, taskQueue.getThreadPool(), + TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(30), future) { + + @Override + public void tryAction(ActionListener listener) { + executedCount.getAndIncrement(); + throw new IllegalStateException(); + } + + @Override + public boolean shouldRetry(Exception e) { + return e instanceof EsRejectedExecutionException; + } + }; + retryableAction.run(); + taskQueue.runAllRunnableTasks(); + + assertEquals(1, executedCount.get()); + expectThrows(IllegalStateException.class, future::actionGet); + } + + public void testRetryableActionCancelled() { + final AtomicInteger executedCount = new AtomicInteger(); + final PlainActionFuture future = PlainActionFuture.newFuture(); + final RetryableAction retryableAction = new RetryableAction(logger, taskQueue.getThreadPool(), + TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(30), future) { + + @Override + public void tryAction(ActionListener listener) { + if (executedCount.incrementAndGet() == 1) { + throw new EsRejectedExecutionException(); + } else { + listener.onResponse(true); + } + } + + @Override + public boolean shouldRetry(Exception e) { + return e instanceof EsRejectedExecutionException; + } + }; + retryableAction.run(); + taskQueue.runAllRunnableTasks(); + assertTrue(taskQueue.hasDeferredTasks()); + taskQueue.advanceTime(); + + retryableAction.cancel(new ElasticsearchException("Cancelled")); + taskQueue.runAllRunnableTasks(); + + assertEquals(2, executedCount.get()); + expectThrows(ElasticsearchException.class, future::actionGet); + } +} diff --git a/server/src/test/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java index a615b3c98f4..8415ac526e1 100644 --- a/server/src/test/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java @@ -100,8 +100,8 @@ public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase { countDownLatch.await(); logger.info("waiting for cluster to reform"); - masterTransportService.clearRule(localTransportService); - nonMasterTransportService.clearRule(localTransportService); + masterTransportService.clearOutboundRules(localTransportService); + nonMasterTransportService.clearOutboundRules(localTransportService); ensureStableCluster(2); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java index 75aa7f1676d..d9cb3dc1553 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java @@ -138,7 +138,7 @@ public class GlobalCheckpointSyncIT extends ESIntegTestCase { (MockTransportService) internalCluster().getInstance(TransportService.class, node.getName()); final MockTransportService receiverTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, other.getName()); - senderTransportService.clearRule(receiverTransportService); + senderTransportService.clearOutboundRules(receiverTransportService); } } }); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 79171fd83b5..e9e3cd0e502 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -60,10 +60,13 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; @@ -94,6 +97,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; @@ -107,7 +111,9 @@ import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -717,6 +723,143 @@ public class IndexRecoveryIT extends ESIntegTestCase { assertThat(indexState.recoveredBytesPercent(), lessThanOrEqualTo(100.0f)); } + public void testTransientErrorsDuringRecoveryAreRetried() throws Exception { + final String indexName = "test"; + final Settings nodeSettings = Settings.builder() + .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "360s") + .put(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING.getKey(), "10s") + .build(); + // start a master node + internalCluster().startNode(nodeSettings); + + final String blueNodeName = internalCluster() + .startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()); + final String redNodeName = internalCluster() + .startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build()); + + ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get(); + assertThat(response.isTimedOut(), is(false)); + + client().admin().indices().prepareCreate(indexName) + .setSettings( + Settings.builder() + .put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "blue") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ).get(); + + List requests = new ArrayList<>(); + int numDocs = scaledRandomIntBetween(100, 8000); + // Index 3/4 of the documents and flush. And then index the rest. This attempts to ensure that there + // is a mix of file chunks and translog ops + int threeFourths = (int) (numDocs * 0.75); + for (int i = 0; i < threeFourths; i++) { + requests.add(client().prepareIndex(indexName, "type").setSource("{}", XContentType.JSON)); + } + indexRandom(true, requests); + flush(indexName); + requests.clear(); + + for (int i = threeFourths; i < numDocs; i++) { + requests.add(client().prepareIndex(indexName, "type").setSource("{}", XContentType.JSON)); + } + indexRandom(true, requests); + ensureSearchable(indexName); + + ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get(); + final String blueNodeId = internalCluster().getInstance(ClusterService.class, blueNodeName).localNode().getId(); + + assertFalse(stateResponse.getState().getRoutingNodes().node(blueNodeId).isEmpty()); + + SearchResponse searchResponse = client().prepareSearch(indexName).get(); + assertHitCount(searchResponse, numDocs); + + String[] recoveryActions = new String[]{ + PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, + PeerRecoveryTargetService.Actions.TRANSLOG_OPS, + PeerRecoveryTargetService.Actions.FILES_INFO, + PeerRecoveryTargetService.Actions.FILE_CHUNK, + PeerRecoveryTargetService.Actions.CLEAN_FILES, + PeerRecoveryTargetService.Actions.FINALIZE + }; + final String recoveryActionToBlock = randomFrom(recoveryActions); + logger.info("--> will break connection between blue & red on [{}]", recoveryActionToBlock); + + MockTransportService blueTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName); + MockTransportService redTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName); + + final SingleStartEnforcer validator = new SingleStartEnforcer(); + blueTransportService.addSendBehavior(redTransportService, (connection, requestId, action, request, options) -> { + validator.accept(action); + connection.sendRequest(requestId, action, request, options); + }); + redTransportService.addSendBehavior(blueTransportService, (connection, requestId, action, request, options) -> { + validator.accept(action); + connection.sendRequest(requestId, action, request, options); + }); + blueTransportService.addRequestHandlingBehavior(recoveryActionToBlock, new TransientReceiveRejected(recoveryActionToBlock)); + redTransportService.addRequestHandlingBehavior(recoveryActionToBlock, new TransientReceiveRejected(recoveryActionToBlock)); + + try { + logger.info("--> starting recovery from blue to red"); + client().admin().indices().prepareUpdateSettings(indexName).setSettings( + Settings.builder() + .put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "red,blue") + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + ).get(); + + ensureGreen(); + searchResponse = client(redNodeName).prepareSearch(indexName).setPreference("_local").get(); + assertHitCount(searchResponse, numDocs); + } finally { + blueTransportService.clearAllRules(); + redTransportService.clearAllRules(); + } + } + + private class TransientReceiveRejected implements StubbableTransport.RequestHandlingBehavior { + + private final String actionName; + private final AtomicInteger blocksRemaining; + + private TransientReceiveRejected(String actionName) { + this.actionName = actionName; + this.blocksRemaining = new AtomicInteger(randomIntBetween(1, 3)); + } + + @Override + public void messageReceived(TransportRequestHandler handler, TransportRequest request, TransportChannel channel, + Task task) throws Exception { + if (blocksRemaining.updateAndGet(i -> i == 0 ? 0 : i - 1) != 0) { + logger.info("--> preventing {} response by throwing exception", actionName); + if (randomBoolean()) { + throw new EsRejectedExecutionException(); + } else { + throw new CircuitBreakingException("Broken", CircuitBreaker.Durability.PERMANENT); + } + } + handler.messageReceived(request, channel, task); + } + } + + private static class SingleStartEnforcer implements Consumer { + + private final AtomicBoolean recoveryStarted = new AtomicBoolean(false); + + @Override + public void accept(String action) { + // The cluster state applier will immediately attempt to retry the recovery on a cluster state + // update. We want to assert that the first and only recovery attempt succeeds + if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) { + if (recoveryStarted.compareAndSet(false, true) == false) { + throw new IllegalStateException("Recovery cannot be started twice"); + } + } + } + } + public void testDisconnectsWhileRecovering() throws Exception { final String indexName = "test"; final Settings nodeSettings = Settings.builder() @@ -819,7 +962,6 @@ public class IndexRecoveryIT extends ESIntegTestCase { ensureGreen(); searchResponse = client(redNodeName).prepareSearch(indexName).setPreference("_local").get(); assertHitCount(searchResponse, numDocs); - } private class RecoveryActionBlocker implements StubbableTransport.SendRequestBehavior { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java index 491c3974e5b..de46f0d9a5d 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -39,7 +40,8 @@ public class PeerRecoverySourceServiceTests extends IndexShardTestCase { IndexShard primary = newStartedShard(true); PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService( mock(TransportService.class), mock(IndicesService.class), - new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + BigArrays.NON_RECYCLING_INSTANCE); StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10), getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(), SequenceNumbers.UNASSIGNED_SEQ_NO); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 6751669b015..e40c43a6ba7 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -152,8 +152,6 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; -import org.elasticsearch.index.seqno.RetentionLeaseSyncer; -import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; @@ -1370,7 +1368,7 @@ public class SnapshotResiliencyTests extends ESTestCase { repositoriesService, mock(SearchService.class), new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver), - new PeerRecoverySourceService(transportService, indicesService, recoverySettings), + new PeerRecoverySourceService(transportService, indicesService, recoverySettings, bigArrays), snapshotShardsService, new PrimaryReplicaSyncer( transportService, diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java index cce5ed2ee28..31d053e814c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java @@ -405,7 +405,7 @@ public class NetworkDisruption implements ServiceDisruptionScheme { * @param targetTransportService target transport service to which requests are sent */ public void removeDisruption(MockTransportService sourceTransportService, MockTransportService targetTransportService) { - sourceTransportService.clearRule(targetTransportService); + sourceTransportService.clearOutboundRules(targetTransportService); } /** diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 83085ae8741..1ae3fed774e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -196,19 +196,26 @@ public final class MockTransportService extends TransportService { } /** - * Clears the rule associated with the provided delegate service. + * Clears all the inbound rules. */ - public void clearRule(TransportService transportService) { + public void clearInboundRules() { + transport().clearInboundBehaviors(); + } + + /** + * Clears the outbound rules associated with the provided delegate service. + */ + public void clearOutboundRules(TransportService transportService) { for (TransportAddress transportAddress : extractTransportAddresses(transportService)) { - clearRule(transportAddress); + clearOutboundRules(transportAddress); } } /** - * Clears the rule associated with the provided delegate address. + * Clears the outbound rules associated with the provided delegate address. */ - public void clearRule(TransportAddress transportAddress) { - transport().clearBehavior(transportAddress); + public void clearOutboundRules(TransportAddress transportAddress) { + transport().clearOutboundBehaviors(transportAddress); connectionManager().clearBehavior(transportAddress); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index 45355a520d5..2226974d40a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -92,17 +92,25 @@ public class StubbableTransport implements Transport { } void clearBehaviors() { - this.defaultSendRequest = null; - sendBehaviors.clear(); - this.defaultConnectBehavior = null; - connectBehaviors.clear(); + clearOutboundBehaviors(); + clearInboundBehaviors(); + } + + void clearInboundBehaviors() { for (Map.Entry> entry : replacedRequestRegistries.entrySet()) { getRequestHandlers().forceRegister(entry.getValue()); } replacedRequestRegistries.clear(); } - void clearBehavior(TransportAddress transportAddress) { + void clearOutboundBehaviors() { + this.defaultSendRequest = null; + sendBehaviors.clear(); + this.defaultConnectBehavior = null; + connectBehaviors.clear(); + } + + void clearOutboundBehaviors(TransportAddress transportAddress) { SendRequestBehavior behavior = sendBehaviors.remove(transportAddress); if (behavior != null) { behavior.clearCallback();