[CCR] Don't fail shard follow tasks in case of a non-retryable error (#34404)

This commit is contained in:
Martijn van Groningen 2018-10-16 07:44:15 +02:00 committed by GitHub
parent 92b2e1a209
commit f7df8718b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 116 additions and 54 deletions

View File

@ -85,6 +85,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
private final LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>> fetchExceptions;
private volatile ElasticsearchException fatalException;
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
super(id, type, action, description, parentTask, headers);
@ -373,7 +375,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
long delay = computeDelay(currentRetry, params.getPollTimeout().getMillis());
scheduler.accept(TimeValue.timeValueMillis(delay), task);
} else {
markAsFailed(e);
fatalException = ExceptionsHelper.convertToElastic(e);
}
}
@ -423,7 +425,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
}
protected boolean isStopped() {
return isCancelled() || isCompleted();
return fatalException != null || isCancelled() || isCompleted();
}
public ShardId getFollowShardId() {
@ -467,7 +469,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
.stream()
.collect(
Collectors.toMap(Map.Entry::getKey, e -> Tuple.tuple(e.getValue().v1().get(), e.getValue().v2())))),
timeSinceLastFetchMillis);
timeSinceLastFetchMillis,
fatalException);
}
}

View File

@ -12,12 +12,12 @@ import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
@ -25,19 +25,17 @@ import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class TransportFollowStatsAction extends TransportTasksAction<
ShardFollowNodeTask,
FollowStatsAction.StatsRequest,
FollowStatsAction.StatsResponses, FollowStatsAction.StatsResponse> {
private final IndexNameExpressionResolver resolver;
private final CcrLicenseChecker ccrLicenseChecker;
@Inject
@ -46,7 +44,6 @@ public class TransportFollowStatsAction extends TransportTasksAction<
final ClusterService clusterService,
final TransportService transportService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver resolver,
final CcrLicenseChecker ccrLicenseChecker) {
super(
settings,
@ -57,7 +54,6 @@ public class TransportFollowStatsAction extends TransportTasksAction<
FollowStatsAction.StatsRequest::new,
FollowStatsAction.StatsResponses::new,
Ccr.CCR_THREAD_POOL_NAME);
this.resolver = Objects.requireNonNull(resolver);
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
}
@ -90,11 +86,19 @@ public class TransportFollowStatsAction extends TransportTasksAction<
@Override
protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
final ClusterState state = clusterService.state();
final Set<String> concreteIndices = new HashSet<>(Arrays.asList(resolver.concreteIndexNames(state, request)));
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
final Set<String> followerIndices = persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
.map(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName();
})
.collect(Collectors.toSet());
for (final Task task : taskManager.getTasks().values()) {
if (task instanceof ShardFollowNodeTask) {
final ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
if (concreteIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) {
if (followerIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) {
operation.accept(shardFollowNodeTask);
}
}

View File

@ -12,7 +12,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@ -21,8 +20,10 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
public class TransportPauseFollowAction extends HandledTransportAction<PauseFollowAction.Request, AcknowledgedResponse> {
@ -48,18 +49,32 @@ public class TransportPauseFollowAction extends HandledTransportAction<PauseFoll
final ActionListener<AcknowledgedResponse> listener) {
client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> {
IndexMetaData followIndexMetadata = r.getState().getMetaData().index(request.getFollowIndex());
if (followIndexMetadata == null) {
listener.onFailure(new IllegalArgumentException("follow index [" + request.getFollowIndex() + "] does not exist"));
PersistentTasksCustomMetaData persistentTasksMetaData = r.getState().metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasksMetaData == null) {
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
return;
}
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
List<String> shardFollowTaskIds = persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> ShardFollowTask.NAME.equals(persistentTask.getTaskName()))
.filter(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName().equals(request.getFollowIndex());
})
.map(PersistentTasksCustomMetaData.PersistentTask::getId)
.collect(Collectors.toList());
if (shardFollowTaskIds.isEmpty()) {
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
return;
}
final AtomicInteger counter = new AtomicInteger(shardFollowTaskIds.size());
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(shardFollowTaskIds.size());
int i = 0;
for (String taskId : shardFollowTaskIds) {
final int shardId = i++;
persistentTasksService.sendRemoveRequest(taskId,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override

View File

@ -647,6 +647,17 @@ public class ShardChangesIT extends ESIntegTestCase {
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet();
assertBusy(() -> {
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedFetches(), greaterThanOrEqualTo(1L));
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
assertThat(fatalException, notNullValue());
assertThat(fatalException.getMessage(), equalTo("no such index"));
});
unfollowIndex("index2");
ensureNoCcrTasks();
}
@ -666,6 +677,17 @@ public class ShardChangesIT extends ESIntegTestCase {
client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L));
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
assertThat(fatalException, notNullValue());
assertThat(fatalException.getMessage(), equalTo("no such index"));
});
unfollowIndex("index2");
ensureNoCcrTasks();
}

View File

@ -56,7 +56,8 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
randomNonNegativeLong(),
randomNonNegativeLong(),
randomReadExceptions(),
randomLong());
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);
}
@Override

View File

@ -45,7 +45,6 @@ import static org.hamcrest.Matchers.sameInstance;
public class ShardFollowNodeTaskTests extends ESTestCase {
private Exception fatalError;
private List<long[]> shardChangesRequests;
private List<List<Translog.Operation>> bulkShardOperationRequests;
private BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> task.run();
@ -345,7 +344,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
assertTrue("task is stopped", task.isStopped());
assertThat(fatalError, sameInstance(failure));
assertThat(task.getStatus().getFatalException().getRootCause(), sameInstance(failure));
ShardFollowNodeTaskStatus status = task.getStatus();
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
@ -791,19 +790,13 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
@Override
protected boolean isStopped() {
return stopped.get();
return super.isStopped() || stopped.get();
}
@Override
public void markAsCompleted() {
stopped.set(true);
}
@Override
public void markAsFailed(Exception e) {
fatalError = e;
stopped.set(true);
}
};
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr.action;
import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
@ -46,7 +47,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
@ -180,7 +180,8 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
assertBusy(() -> {
assertThat(shardFollowTask.isStopped(), is(true));
assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
ElasticsearchException failure = shardFollowTask.getStatus().getFatalException();
assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
"], actual [" + newHistoryUUID + "]"));
});
}
@ -221,7 +222,8 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
assertBusy(() -> {
assertThat(shardFollowTask.isStopped(), is(true));
assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
ElasticsearchException failure = shardFollowTask.getStatus().getFatalException();
assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
"], actual [" + newHistoryUUID + "], shard is likely restored from snapshot or force allocated"));
});
}
@ -325,7 +327,6 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
AtomicBoolean stopped = new AtomicBoolean(false);
AtomicReference<Exception> failureHolder = new AtomicReference<>();
LongSet fetchOperations = new LongHashSet();
return new ShardFollowNodeTask(
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) {
@ -403,7 +404,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
@Override
protected boolean isStopped() {
return stopped.get();
return super.isStopped() || stopped.get();
}
@Override
@ -411,16 +412,6 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
stopped.set(true);
}
@Override
public void markAsFailed(Exception e) {
failureHolder.set(e);
stopped.set(true);
}
@Override
public Exception getFailure() {
return failureHolder.get();
}
};
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
@ -48,7 +49,8 @@ public class StatsResponsesTests extends AbstractStreamableTestCase<FollowStatsA
randomNonNegativeLong(),
randomNonNegativeLong(),
Collections.emptyNavigableMap(),
randomLong());
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);
responses.add(new FollowStatsAction.StatsResponse(status));
}
return new FollowStatsAction.StatsResponses(Collections.emptyList(), Collections.emptyList(), responses);

View File

@ -130,7 +130,8 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
numberOfFailedBulkOperations,
numberOfOperationsIndexed,
fetchExceptions,
timeSinceLastFetchMillis);
timeSinceLastFetchMillis,
new ElasticsearchException("fatal error"));
final FollowStatsMonitoringDoc document = new FollowStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, status);
final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
assertThat(
@ -181,7 +182,8 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
+ "}"
+ "}"
+ "],"
+ "\"time_since_last_fetch_millis\":" + timeSinceLastFetchMillis
+ "\"time_since_last_fetch_millis\":" + timeSinceLastFetchMillis + ","
+ "\"fatal_exception\":{\"type\":\"exception\",\"reason\":\"fatal error\"}"
+ "}"
+ "}"));
}
@ -212,7 +214,8 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
0,
10,
fetchExceptions,
2);
2,
null);
XContentBuilder builder = jsonBuilder();
builder.value(status);
Map<String, Object> serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);

View File

@ -57,6 +57,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
private static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField("number_of_operations_indexed");
private static final ParseField FETCH_EXCEPTIONS = new ParseField("fetch_exceptions");
private static final ParseField TIME_SINCE_LAST_FETCH_MILLIS_FIELD = new ParseField("time_since_last_fetch_millis");
private static final ParseField FATAL_EXCEPTION = new ParseField("fatal_exception");
@SuppressWarnings("unchecked")
static final ConstructingObjectParser<ShardFollowNodeTaskStatus, Void> STATUS_PARSER =
@ -88,7 +89,8 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[21])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
(long) args[22]));
(long) args[22],
(ElasticsearchException) args[23]));
public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry";
@ -121,6 +123,9 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD);
STATUS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_PARSER, FETCH_EXCEPTIONS);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIME_SINCE_LAST_FETCH_MILLIS_FIELD);
STATUS_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
FATAL_EXCEPTION);
}
static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no");
@ -274,6 +279,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
return timeSinceLastFetchMillis;
}
private final ElasticsearchException fatalException;
public ElasticsearchException getFatalException() {
return fatalException;
}
public ShardFollowNodeTaskStatus(
final String leaderIndex,
final String followerIndex,
@ -297,7 +308,8 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
final long numberOfFailedBulkOperations,
final long numberOfOperationsIndexed,
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions,
final long timeSinceLastFetchMillis) {
final long timeSinceLastFetchMillis,
final ElasticsearchException fatalException) {
this.leaderIndex = leaderIndex;
this.followerIndex = followerIndex;
this.shardId = shardId;
@ -321,6 +333,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
this.numberOfOperationsIndexed = numberOfOperationsIndexed;
this.fetchExceptions = Objects.requireNonNull(fetchExceptions);
this.timeSinceLastFetchMillis = timeSinceLastFetchMillis;
this.fatalException = fatalException;
}
public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException {
@ -348,6 +361,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
this.fetchExceptions =
new TreeMap<>(in.readMap(StreamInput::readVLong, stream -> Tuple.tuple(stream.readVInt(), stream.readException())));
this.timeSinceLastFetchMillis = in.readZLong();
this.fatalException = in.readException();
}
@Override
@ -386,6 +400,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
stream.writeException(value.v2());
});
out.writeZLong(timeSinceLastFetchMillis);
out.writeException(fatalException);
}
@Override
@ -451,6 +466,14 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
TIME_SINCE_LAST_FETCH_MILLIS_FIELD.getPreferredName(),
"time_since_last_fetch",
new TimeValue(timeSinceLastFetchMillis, TimeUnit.MILLISECONDS));
if (fatalException != null) {
builder.field(FATAL_EXCEPTION.getPreferredName());
builder.startObject();
{
ElasticsearchException.generateThrowableXContent(builder, params, fatalException);
}
builder.endObject();
}
return builder;
}
@ -463,6 +486,8 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final ShardFollowNodeTaskStatus that = (ShardFollowNodeTaskStatus) o;
String fatalExceptionMessage = fatalException != null ? fatalException.getMessage() : null;
String otherFatalExceptionMessage = that.fatalException != null ? that.fatalException.getMessage() : null;
return leaderIndex.equals(that.leaderIndex) &&
followerIndex.equals(that.followerIndex) &&
shardId == that.shardId &&
@ -490,11 +515,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
*/
fetchExceptions.keySet().equals(that.fetchExceptions.keySet()) &&
getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)) &&
timeSinceLastFetchMillis == that.timeSinceLastFetchMillis;
timeSinceLastFetchMillis == that.timeSinceLastFetchMillis &&
Objects.equals(fatalExceptionMessage, otherFatalExceptionMessage);
}
@Override
public int hashCode() {
String fatalExceptionMessage = fatalException != null ? fatalException.getMessage() : null;
return Objects.hash(
leaderIndex,
followerIndex,
@ -522,7 +549,8 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
*/
fetchExceptions.keySet(),
getFetchExceptionMessages(this),
timeSinceLastFetchMillis);
timeSinceLastFetchMillis,
fatalExceptionMessage);
}
private static List<String> getFetchExceptionMessages(final ShardFollowNodeTaskStatus status) {