CCR should retry on CircuitBreakingException (#62013)

CCR shard follow task can hit CircuitBreakingException on the leader 
cluster (read changes requests) or the follower cluster (bulk requests).
CCR should retry on CircuitBreakingException as it's a transient error.
This commit is contained in:
Nhat Nguyen 2020-09-04 22:46:51 -04:00
parent ac23380560
commit 87c889f9c9
2 changed files with 10 additions and 2 deletions

View File

@ -18,6 +18,7 @@ import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -567,7 +568,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
actual instanceof ConnectTransportException || actual instanceof ConnectTransportException ||
actual instanceof NodeClosedException || actual instanceof NodeClosedException ||
actual instanceof NoSuchRemoteClusterException || actual instanceof NoSuchRemoteClusterException ||
actual instanceof EsRejectedExecutionException; actual instanceof EsRejectedExecutionException ||
actual instanceof CircuitBreakingException;
} }
// These methods are protected for testing purposes: // These methods are protected for testing purposes:

View File

@ -7,9 +7,12 @@ package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
@ -311,7 +314,10 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
List<TestResponse> item = new ArrayList<>(); List<TestResponse> item = new ArrayList<>();
// Sometimes add a random retryable error // Sometimes add a random retryable error
if (sometimes()) { if (sometimes()) {
Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), ""); Exception error = randomFrom(
new UnavailableShardsException(new ShardId("test", "test", 0), ""),
new CircuitBreakingException("test", randomInt(), randomInt(), randomFrom(CircuitBreaker.Durability.values())),
new EsRejectedExecutionException("test"));
item.add(new TestResponse(error, mappingVersion, settingsVersion, null)); item.add(new TestResponse(error, mappingVersion, settingsVersion, null));
} }
// Sometimes add an empty shard changes response to also simulate a leader shard lagging behind // Sometimes add an empty shard changes response to also simulate a leader shard lagging behind