diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index b1d6467168c..a1a0e25e126 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -418,12 +418,15 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) { assert e != null; - if (shouldRetry(params.getRemoteCluster(), e) && isStopped() == false) { - int currentRetry = retryCounter.incrementAndGet(); - LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]", - params.getFollowShardId(), currentRetry), e); - long delay = computeDelay(currentRetry, params.getReadPollTimeout().getMillis()); - scheduler.accept(TimeValue.timeValueMillis(delay), task); + if (shouldRetry(params.getRemoteCluster(), e)) { + if (isStopped() == false) { + // Only retry is the shard follow task is not stopped. + int currentRetry = retryCounter.incrementAndGet(); + LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]", + params.getFollowShardId(), currentRetry), e); + long delay = computeDelay(currentRetry, params.getReadPollTimeout().getMillis()); + scheduler.accept(TimeValue.timeValueMillis(delay), task); + } } else { fatalException = ExceptionsHelper.convertToElastic(e); LOGGER.warn("shard follow task encounter non-retryable error", e); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index cf7f901aaba..9929241fc23 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -40,7 +40,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; public class ShardFollowNodeTaskTests extends ESTestCase { @@ -287,6 +289,35 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } + public void testFatalExceptionNotSetWhenStoppingWhileFetchingOps() { + ShardFollowTaskParams params = new ShardFollowTaskParams(); + params.maxReadRequestOperationCount = 64; + params.maxOutstandingReadRequests = 1; + params.maxOutstandingWriteRequests = 1; + ShardFollowNodeTask task = createShardFollowTask(params); + startTask(task, 63, -1); + + readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); + + mappingVersions.add(1L); + leaderGlobalCheckpoints.add(63L); + maxSeqNos.add(63L); + responseSizes.add(64); + simulateResponse.set(true); + beforeSendShardChangesRequest = status -> { + // Cancel just before attempting to fetch operations: + task.onCancelled(); + }; + task.coordinateReads(); + + assertThat(task.isStopped(), is(true)); + ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.getFatalException(), nullValue()); + assertThat(status.failedReadRequests(), equalTo(1L)); + assertThat(status.successfulReadRequests(), equalTo(0L)); + assertThat(status.readExceptions().size(), equalTo(1)); + } + public void testEmptyShardChangesResponseShouldClearFetchException() { ShardFollowTaskParams params = new ShardFollowTaskParams(); params.maxReadRequestOperationCount = 64;