From 87c889f9c99eaf4289f20adc8d8196839ecea1ad Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 4 Sep 2020 22:46:51 -0400 Subject: [PATCH] CCR should retry on CircuitBreakingException (#62013) CCR shard follow task can hit CircuitBreakingException on the leader cluster (read changes requests) or the follower cluster (bulk requests). CCR should retry on CircuitBreakingException as it's a transient error. --- .../xpack/ccr/action/ShardFollowNodeTask.java | 4 +++- .../xpack/ccr/action/ShardFollowNodeTaskRandomTests.java | 8 +++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 0ef9e557b7c..9845d5c8598 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; @@ -567,7 +568,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { actual instanceof ConnectTransportException || actual instanceof NodeClosedException || actual instanceof NoSuchRemoteClusterException || - actual instanceof EsRejectedExecutionException; + actual instanceof EsRejectedExecutionException || + actual instanceof CircuitBreakingException; } // These methods are protected for testing purposes: diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index 21d5d3547b5..48de6c32341 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -7,9 +7,12 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -311,7 +314,10 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase { List item = new ArrayList<>(); // Sometimes add a random retryable error if (sometimes()) { - Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), ""); + Exception error = randomFrom( + new UnavailableShardsException(new ShardId("test", "test", 0), ""), + new CircuitBreakingException("test", randomInt(), randomInt(), randomFrom(CircuitBreaker.Durability.values())), + new EsRejectedExecutionException("test")); item.add(new TestResponse(error, mappingVersion, settingsVersion, null)); } // Sometimes add an empty shard changes response to also simulate a leader shard lagging behind