From f7df8718b9b3b3cf5e3d348da60b8dc9dd5e2b4f Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 16 Oct 2018 07:44:15 +0200 Subject: [PATCH] [CCR] Don't fail shard follow tasks in case of a non-retryable error (#34404) --- .../xpack/ccr/action/ShardFollowNodeTask.java | 9 +++-- .../action/TransportFollowStatsAction.java | 20 ++++++----- .../action/TransportPauseFollowAction.java | 35 ++++++++++++------ .../xpack/ccr/ShardChangesIT.java | 22 ++++++++++++ .../ShardFollowNodeTaskStatusTests.java | 3 +- .../ccr/action/ShardFollowNodeTaskTests.java | 11 ++---- .../ShardFollowTaskReplicationTests.java | 21 ++++------- .../xpack/ccr/action/StatsResponsesTests.java | 4 ++- .../ccr/FollowStatsMonitoringDocTests.java | 9 +++-- .../core/ccr/ShardFollowNodeTaskStatus.java | 36 ++++++++++++++++--- 10 files changed, 116 insertions(+), 54 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index a10ee10f22a..48c04ad05a6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -85,6 +85,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private final Queue buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo)); private final LinkedHashMap> fetchExceptions; + private volatile ElasticsearchException fatalException; + ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers, ShardFollowTask params, BiConsumer scheduler, final LongSupplier relativeTimeProvider) { super(id, type, action, description, parentTask, headers); @@ -373,7 +375,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { long delay = computeDelay(currentRetry, params.getPollTimeout().getMillis()); scheduler.accept(TimeValue.timeValueMillis(delay), task); } else { - markAsFailed(e); + fatalException = ExceptionsHelper.convertToElastic(e); } } @@ -423,7 +425,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { } protected boolean isStopped() { - return isCancelled() || isCompleted(); + return fatalException != null || isCancelled() || isCompleted(); } public ShardId getFollowShardId() { @@ -467,7 +469,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { .stream() .collect( Collectors.toMap(Map.Entry::getKey, e -> Tuple.tuple(e.getValue().v1().get(), e.getValue().v2())))), - timeSinceLastFetchMillis); + timeSinceLastFetchMillis, + fatalException); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java index bce471d05e8..5ef790d3406 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java @@ -12,12 +12,12 @@ import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.Ccr; @@ -25,19 +25,17 @@ import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.function.Consumer; +import java.util.stream.Collectors; public class TransportFollowStatsAction extends TransportTasksAction< ShardFollowNodeTask, FollowStatsAction.StatsRequest, FollowStatsAction.StatsResponses, FollowStatsAction.StatsResponse> { - private final IndexNameExpressionResolver resolver; private final CcrLicenseChecker ccrLicenseChecker; @Inject @@ -46,7 +44,6 @@ public class TransportFollowStatsAction extends TransportTasksAction< final ClusterService clusterService, final TransportService transportService, final ActionFilters actionFilters, - final IndexNameExpressionResolver resolver, final CcrLicenseChecker ccrLicenseChecker) { super( settings, @@ -57,7 +54,6 @@ public class TransportFollowStatsAction extends TransportTasksAction< FollowStatsAction.StatsRequest::new, FollowStatsAction.StatsResponses::new, Ccr.CCR_THREAD_POOL_NAME); - this.resolver = Objects.requireNonNull(resolver); this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker); } @@ -90,11 +86,19 @@ public class TransportFollowStatsAction extends TransportTasksAction< @Override protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer operation) { final ClusterState state = clusterService.state(); - final Set concreteIndices = new HashSet<>(Arrays.asList(resolver.concreteIndexNames(state, request))); + final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + final Set followerIndices = persistentTasksMetaData.tasks().stream() + .filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME)) + .map(persistentTask -> { + ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams(); + return shardFollowTask.getFollowShardId().getIndexName(); + }) + .collect(Collectors.toSet()); + for (final Task task : taskManager.getTasks().values()) { if (task instanceof ShardFollowNodeTask) { final ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task; - if (concreteIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) { + if (followerIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) { operation.accept(shardFollowNodeTask); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPauseFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPauseFollowAction.java index 041d5e3f429..47fd785a0d3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPauseFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPauseFollowAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -21,8 +20,10 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.Collectors; public class TransportPauseFollowAction extends HandledTransportAction { @@ -48,18 +49,32 @@ public class TransportPauseFollowAction extends HandledTransportAction listener) { client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> { - IndexMetaData followIndexMetadata = r.getState().getMetaData().index(request.getFollowIndex()); - if (followIndexMetadata == null) { - listener.onFailure(new IllegalArgumentException("follow index [" + request.getFollowIndex() + "] does not exist")); + PersistentTasksCustomMetaData persistentTasksMetaData = r.getState().metaData().custom(PersistentTasksCustomMetaData.TYPE); + if (persistentTasksMetaData == null) { + listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]")); return; } - final int numShards = followIndexMetadata.getNumberOfShards(); - final AtomicInteger counter = new AtomicInteger(numShards); - final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); - for (int i = 0; i < numShards; i++) { - final int shardId = i; - String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; + List shardFollowTaskIds = persistentTasksMetaData.tasks().stream() + .filter(persistentTask -> ShardFollowTask.NAME.equals(persistentTask.getTaskName())) + .filter(persistentTask -> { + ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams(); + return shardFollowTask.getFollowShardId().getIndexName().equals(request.getFollowIndex()); + }) + .map(PersistentTasksCustomMetaData.PersistentTask::getId) + .collect(Collectors.toList()); + + if (shardFollowTaskIds.isEmpty()) { + listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]")); + return; + } + + final AtomicInteger counter = new AtomicInteger(shardFollowTaskIds.size()); + final AtomicReferenceArray responses = new AtomicReferenceArray<>(shardFollowTaskIds.size()); + int i = 0; + + for (String taskId : shardFollowTaskIds) { + final int shardId = i++; persistentTasksService.sendRemoveRequest(taskId, new ActionListener>() { @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 6a4b6191666..825cf78c11e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -647,6 +647,17 @@ public class ShardChangesIT extends ESIntegTestCase { assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet(); + assertBusy(() -> { + StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet(); + assertThat(response.getNodeFailures(), empty()); + assertThat(response.getTaskFailures(), empty()); + assertThat(response.getStatsResponses(), hasSize(1)); + assertThat(response.getStatsResponses().get(0).status().numberOfFailedFetches(), greaterThanOrEqualTo(1L)); + ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException(); + assertThat(fatalException, notNullValue()); + assertThat(fatalException.getMessage(), equalTo("no such index")); + }); + unfollowIndex("index2"); ensureNoCcrTasks(); } @@ -666,6 +677,17 @@ public class ShardChangesIT extends ESIntegTestCase { client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet(); client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> { + StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet(); + assertThat(response.getNodeFailures(), empty()); + assertThat(response.getTaskFailures(), empty()); + assertThat(response.getStatsResponses(), hasSize(1)); + assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L)); + ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException(); + assertThat(fatalException, notNullValue()); + assertThat(fatalException.getMessage(), equalTo("no such index")); + }); + unfollowIndex("index2"); ensureNoCcrTasks(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index 48e4359ccb5..3b8c13dda88 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -56,7 +56,8 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase< randomNonNegativeLong(), randomNonNegativeLong(), randomReadExceptions(), - randomLong()); + randomLong(), + randomBoolean() ? new ElasticsearchException("fatal error") : null); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index e772516e331..b221a79e69e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -45,7 +45,6 @@ import static org.hamcrest.Matchers.sameInstance; public class ShardFollowNodeTaskTests extends ESTestCase { - private Exception fatalError; private List shardChangesRequests; private List> bulkShardOperationRequests; private BiConsumer scheduler = (delay, task) -> task.run(); @@ -345,7 +344,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); assertTrue("task is stopped", task.isStopped()); - assertThat(fatalError, sameInstance(failure)); + assertThat(task.getStatus().getFatalException().getRootCause(), sameInstance(failure)); ShardFollowNodeTaskStatus status = task.getStatus(); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); @@ -791,19 +790,13 @@ public class ShardFollowNodeTaskTests extends ESTestCase { @Override protected boolean isStopped() { - return stopped.get(); + return super.isStopped() || stopped.get(); } @Override public void markAsCompleted() { stopped.set(true); } - - @Override - public void markAsFailed(Exception e) { - fatalError = e; - stopped.set(true); - } }; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 5539bc6ae47..7b95252c866 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr.action; import com.carrotsearch.hppc.LongHashSet; import com.carrotsearch.hppc.LongSet; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; @@ -46,7 +47,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; @@ -180,7 +180,8 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest assertBusy(() -> { assertThat(shardFollowTask.isStopped(), is(true)); - assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + + ElasticsearchException failure = shardFollowTask.getStatus().getFatalException(); + assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + "], actual [" + newHistoryUUID + "]")); }); } @@ -221,7 +222,8 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest assertBusy(() -> { assertThat(shardFollowTask.isStopped(), is(true)); - assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + + ElasticsearchException failure = shardFollowTask.getStatus().getFatalException(); + assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + "], actual [" + newHistoryUUID + "], shard is likely restored from snapshot or force allocated")); }); } @@ -325,7 +327,6 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest BiConsumer scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); AtomicBoolean stopped = new AtomicBoolean(false); - AtomicReference failureHolder = new AtomicReference<>(); LongSet fetchOperations = new LongHashSet(); return new ShardFollowNodeTask( 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) { @@ -403,7 +404,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest @Override protected boolean isStopped() { - return stopped.get(); + return super.isStopped() || stopped.get(); } @Override @@ -411,16 +412,6 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest stopped.set(true); } - @Override - public void markAsFailed(Exception e) { - failureHolder.set(e); - stopped.set(true); - } - - @Override - public Exception getFailure() { - return failureHolder.get(); - } }; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java index 1de949b850b..6c82852fca1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; @@ -48,7 +49,8 @@ public class StatsResponsesTests extends AbstractStreamableTestCase serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java index 759823ef786..26341753b37 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java @@ -57,6 +57,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { private static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField("number_of_operations_indexed"); private static final ParseField FETCH_EXCEPTIONS = new ParseField("fetch_exceptions"); private static final ParseField TIME_SINCE_LAST_FETCH_MILLIS_FIELD = new ParseField("time_since_last_fetch_millis"); + private static final ParseField FATAL_EXCEPTION = new ParseField("fatal_exception"); @SuppressWarnings("unchecked") static final ConstructingObjectParser STATUS_PARSER = @@ -88,7 +89,8 @@ public class ShardFollowNodeTaskStatus implements Task.Status { ((List>>) args[21]) .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), - (long) args[22])); + (long) args[22], + (ElasticsearchException) args[23])); public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; @@ -121,6 +123,9 @@ public class ShardFollowNodeTaskStatus implements Task.Status { STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); STATUS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_PARSER, FETCH_EXCEPTIONS); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIME_SINCE_LAST_FETCH_MILLIS_FIELD); + STATUS_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> ElasticsearchException.fromXContent(p), + FATAL_EXCEPTION); } static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no"); @@ -274,6 +279,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status { return timeSinceLastFetchMillis; } + private final ElasticsearchException fatalException; + + public ElasticsearchException getFatalException() { + return fatalException; + } + public ShardFollowNodeTaskStatus( final String leaderIndex, final String followerIndex, @@ -297,7 +308,8 @@ public class ShardFollowNodeTaskStatus implements Task.Status { final long numberOfFailedBulkOperations, final long numberOfOperationsIndexed, final NavigableMap> fetchExceptions, - final long timeSinceLastFetchMillis) { + final long timeSinceLastFetchMillis, + final ElasticsearchException fatalException) { this.leaderIndex = leaderIndex; this.followerIndex = followerIndex; this.shardId = shardId; @@ -321,6 +333,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { this.numberOfOperationsIndexed = numberOfOperationsIndexed; this.fetchExceptions = Objects.requireNonNull(fetchExceptions); this.timeSinceLastFetchMillis = timeSinceLastFetchMillis; + this.fatalException = fatalException; } public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException { @@ -348,6 +361,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { this.fetchExceptions = new TreeMap<>(in.readMap(StreamInput::readVLong, stream -> Tuple.tuple(stream.readVInt(), stream.readException()))); this.timeSinceLastFetchMillis = in.readZLong(); + this.fatalException = in.readException(); } @Override @@ -386,6 +400,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { stream.writeException(value.v2()); }); out.writeZLong(timeSinceLastFetchMillis); + out.writeException(fatalException); } @Override @@ -451,6 +466,14 @@ public class ShardFollowNodeTaskStatus implements Task.Status { TIME_SINCE_LAST_FETCH_MILLIS_FIELD.getPreferredName(), "time_since_last_fetch", new TimeValue(timeSinceLastFetchMillis, TimeUnit.MILLISECONDS)); + if (fatalException != null) { + builder.field(FATAL_EXCEPTION.getPreferredName()); + builder.startObject(); + { + ElasticsearchException.generateThrowableXContent(builder, params, fatalException); + } + builder.endObject(); + } return builder; } @@ -463,6 +486,8 @@ public class ShardFollowNodeTaskStatus implements Task.Status { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final ShardFollowNodeTaskStatus that = (ShardFollowNodeTaskStatus) o; + String fatalExceptionMessage = fatalException != null ? fatalException.getMessage() : null; + String otherFatalExceptionMessage = that.fatalException != null ? that.fatalException.getMessage() : null; return leaderIndex.equals(that.leaderIndex) && followerIndex.equals(that.followerIndex) && shardId == that.shardId && @@ -490,11 +515,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status { */ fetchExceptions.keySet().equals(that.fetchExceptions.keySet()) && getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)) && - timeSinceLastFetchMillis == that.timeSinceLastFetchMillis; + timeSinceLastFetchMillis == that.timeSinceLastFetchMillis && + Objects.equals(fatalExceptionMessage, otherFatalExceptionMessage); } @Override public int hashCode() { + String fatalExceptionMessage = fatalException != null ? fatalException.getMessage() : null; return Objects.hash( leaderIndex, followerIndex, @@ -522,7 +549,8 @@ public class ShardFollowNodeTaskStatus implements Task.Status { */ fetchExceptions.keySet(), getFetchExceptionMessages(this), - timeSinceLastFetchMillis); + timeSinceLastFetchMillis, + fatalExceptionMessage); } private static List getFetchExceptionMessages(final ShardFollowNodeTaskStatus status) {