Track fetch exceptions for shard follow tasks (#33047)

This commit adds tracking and reporting for fetch exceptions. We track
fetch exceptions per fetch, keeping track of up to the maximum number of
concurrent fetches. With each failing fetch, we associate the from
sequence number with the exception that caused the fetch. We report
these in the CCR stats endpoint, and add some testing for this tracking.
This commit is contained in:
Jason Tedor 2018-08-24 14:21:23 -04:00 committed by GitHub
parent 7fa8a728c4
commit ef9607ea0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 283 additions and 35 deletions

View File

@ -154,7 +154,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
ShardFollowTask::new), ShardFollowTask::new),
// Task statuses // Task statuses
new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.NAME, new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.STATUS_PARSER_NAME,
ShardFollowNodeTask.Status::new) ShardFollowNodeTask.Status::new)
); );
} }
@ -166,9 +166,10 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
ShardFollowTask::fromXContent), ShardFollowTask::fromXContent),
// Task statuses // Task statuses
new NamedXContentRegistry.Entry(ShardFollowNodeTask.Status.class, new ParseField(ShardFollowNodeTask.Status.NAME), new NamedXContentRegistry.Entry(
ShardFollowNodeTask.Status::fromXContent) ShardFollowNodeTask.Status.class,
); new ParseField(ShardFollowNodeTask.Status.STATUS_PARSER_NAME),
ShardFollowNodeTask.Status::fromXContent));
} }
/** /**

View File

@ -146,6 +146,17 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
public int hashCode() { public int hashCode() {
return Objects.hash(fromSeqNo, maxOperationCount, shardId, maxOperationSizeInBytes); return Objects.hash(fromSeqNo, maxOperationCount, shardId, maxOperationSizeInBytes);
} }
@Override
public String toString() {
return "Request{" +
"fromSeqNo=" + fromSeqNo +
", maxOperationCount=" + maxOperationCount +
", shardId=" + shardId +
", maxOperationSizeInBytes=" + maxOperationSizeInBytes +
'}';
}
} }
public static final class Response extends ActionResponse { public static final class Response extends ActionResponse {

View File

@ -30,20 +30,25 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import java.io.IOException; import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects; import java.util.Objects;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.Queue; import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.LongConsumer; import java.util.function.LongConsumer;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import java.util.stream.Collectors;
/** /**
* The node task that fetch the write operations from a leader shard and * The node task that fetch the write operations from a leader shard and
@ -86,6 +91,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private long numberOfFailedBulkOperations = 0; private long numberOfFailedBulkOperations = 0;
private long numberOfOperationsIndexed = 0; private long numberOfOperationsIndexed = 0;
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo)); private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
private final LinkedHashMap<Long, ElasticsearchException> fetchExceptions;
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers, ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) { ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
@ -95,6 +101,17 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.relativeTimeProvider = relativeTimeProvider; this.relativeTimeProvider = relativeTimeProvider;
this.retryTimeout = params.getRetryTimeout(); this.retryTimeout = params.getRetryTimeout();
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay(); this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
/*
* We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of
* concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry
* when the fetch task associated with that from sequence number succeeds.
*/
this.fetchExceptions = new LinkedHashMap<Long, ElasticsearchException>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<Long, ElasticsearchException> eldest) {
return size() > params.getMaxConcurrentReadBatches();
}
};
} }
void start( void start(
@ -224,6 +241,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
synchronized (ShardFollowNodeTask.this) { synchronized (ShardFollowNodeTask.this) {
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfSuccessfulFetches++; numberOfSuccessfulFetches++;
fetchExceptions.remove(from);
operationsReceived += response.getOperations().length; operationsReceived += response.getOperations().length;
totalTransferredBytes += Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); totalTransferredBytes += Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
} }
@ -233,6 +251,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
synchronized (ShardFollowNodeTask.this) { synchronized (ShardFollowNodeTask.this) {
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfFailedFetches++; numberOfFailedFetches++;
fetchExceptions.put(from, new ElasticsearchException(e));
} }
handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter)); handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter));
}); });
@ -412,12 +431,13 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
totalIndexTimeMillis, totalIndexTimeMillis,
numberOfSuccessfulBulkOperations, numberOfSuccessfulBulkOperations,
numberOfFailedBulkOperations, numberOfFailedBulkOperations,
numberOfOperationsIndexed); numberOfOperationsIndexed,
new TreeMap<>(fetchExceptions));
} }
public static class Status implements Task.Status { public static class Status implements Task.Status {
public static final String NAME = "shard-follow-node-task-status"; public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status";
static final ParseField SHARD_ID = new ParseField("shard_id"); static final ParseField SHARD_ID = new ParseField("shard_id");
static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint"); static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint");
@ -438,8 +458,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
static final ParseField NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD = new ParseField("number_of_successful_bulk_operations"); static final ParseField NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD = new ParseField("number_of_successful_bulk_operations");
static final ParseField NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD = new ParseField("number_of_failed_bulk_operations"); static final ParseField NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD = new ParseField("number_of_failed_bulk_operations");
static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField("number_of_operations_indexed"); static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField("number_of_operations_indexed");
static final ParseField FETCH_EXCEPTIONS = new ParseField("fetch_exceptions");
static final ConstructingObjectParser<Status, Void> PARSER = new ConstructingObjectParser<>(NAME, @SuppressWarnings("unchecked")
static final ConstructingObjectParser<Status, Void> STATUS_PARSER = new ConstructingObjectParser<>(STATUS_PARSER_NAME,
args -> new Status( args -> new Status(
(int) args[0], (int) args[0],
(long) args[1], (long) args[1],
@ -459,28 +481,51 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
(long) args[15], (long) args[15],
(long) args[16], (long) args[16],
(long) args[17], (long) args[17],
(long) args[18])); (long) args[18],
new TreeMap<>(
((List<Map.Entry<Long, ElasticsearchException>>) args[19])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))));
public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry";
static final ConstructingObjectParser<Map.Entry<Long, ElasticsearchException>, Void> FETCH_EXCEPTIONS_ENTRY_PARSER =
new ConstructingObjectParser<>(
FETCH_EXCEPTIONS_ENTRY_PARSER_NAME,
args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1]));
static { static {
PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAX_SEQ_NO_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAX_SEQ_NO_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQ_NO_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQ_NO_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_TRANSFERRED_BYTES); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_TRANSFERRED_BYTES);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_INDEX_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_INDEX_TIME_MILLIS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD);
STATUS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_PARSER, FETCH_EXCEPTIONS);
}
static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no");
static final ParseField FETCH_EXCEPTIONS_ENTRY_EXCEPTION = new ParseField("exception");
static {
FETCH_EXCEPTIONS_ENTRY_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO);
FETCH_EXCEPTIONS_ENTRY_PARSER.declareObject(
ConstructingObjectParser.constructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
FETCH_EXCEPTIONS_ENTRY_EXCEPTION);
} }
private final int shardId; private final int shardId;
@ -597,6 +642,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
return numberOfOperationsIndexed; return numberOfOperationsIndexed;
} }
private final NavigableMap<Long, ElasticsearchException> fetchExceptions;
public NavigableMap<Long, ElasticsearchException> fetchExceptions() {
return fetchExceptions;
}
Status( Status(
final int shardId, final int shardId,
final long leaderGlobalCheckpoint, final long leaderGlobalCheckpoint,
@ -616,7 +667,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
final long totalIndexTimeMillis, final long totalIndexTimeMillis,
final long numberOfSuccessfulBulkOperations, final long numberOfSuccessfulBulkOperations,
final long numberOfFailedBulkOperations, final long numberOfFailedBulkOperations,
final long numberOfOperationsIndexed) { final long numberOfOperationsIndexed,
final NavigableMap<Long, ElasticsearchException> fetchExceptions) {
this.shardId = shardId; this.shardId = shardId;
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
this.leaderMaxSeqNo = leaderMaxSeqNo; this.leaderMaxSeqNo = leaderMaxSeqNo;
@ -636,6 +688,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.numberOfSuccessfulBulkOperations = numberOfSuccessfulBulkOperations; this.numberOfSuccessfulBulkOperations = numberOfSuccessfulBulkOperations;
this.numberOfFailedBulkOperations = numberOfFailedBulkOperations; this.numberOfFailedBulkOperations = numberOfFailedBulkOperations;
this.numberOfOperationsIndexed = numberOfOperationsIndexed; this.numberOfOperationsIndexed = numberOfOperationsIndexed;
this.fetchExceptions = fetchExceptions;
} }
public Status(final StreamInput in) throws IOException { public Status(final StreamInput in) throws IOException {
@ -658,11 +711,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.numberOfSuccessfulBulkOperations = in.readVLong(); this.numberOfSuccessfulBulkOperations = in.readVLong();
this.numberOfFailedBulkOperations = in.readVLong(); this.numberOfFailedBulkOperations = in.readVLong();
this.numberOfOperationsIndexed = in.readVLong(); this.numberOfOperationsIndexed = in.readVLong();
this.fetchExceptions = new TreeMap<>(in.readMap(StreamInput::readVLong, StreamInput::readException));
} }
@Override @Override
public String getWriteableName() { public String getWriteableName() {
return NAME; return STATUS_PARSER_NAME;
} }
@Override @Override
@ -686,6 +740,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
out.writeVLong(numberOfSuccessfulBulkOperations); out.writeVLong(numberOfSuccessfulBulkOperations);
out.writeVLong(numberOfFailedBulkOperations); out.writeVLong(numberOfFailedBulkOperations);
out.writeVLong(numberOfOperationsIndexed); out.writeVLong(numberOfOperationsIndexed);
out.writeMap(fetchExceptions, StreamOutput::writeVLong, StreamOutput::writeException);
} }
@Override @Override
@ -720,13 +775,30 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
builder.field(NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfSuccessfulBulkOperations); builder.field(NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfSuccessfulBulkOperations);
builder.field(NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfFailedBulkOperations); builder.field(NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfFailedBulkOperations);
builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed); builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed);
builder.startArray(FETCH_EXCEPTIONS.getPreferredName());
{
for (final Map.Entry<Long, ElasticsearchException> entry : fetchExceptions.entrySet()) {
builder.startObject();
{
builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey());
builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName());
builder.startObject();
{
ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue());
}
builder.endObject();
}
builder.endObject();
}
}
builder.endArray();
} }
builder.endObject(); builder.endObject();
return builder; return builder;
} }
public static Status fromXContent(final XContentParser parser) { public static Status fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null); return STATUS_PARSER.apply(parser, null);
} }
@Override @Override

View File

@ -32,6 +32,7 @@ import java.util.function.LongConsumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
public class ShardFollowNodeTaskRandomTests extends ESTestCase { public class ShardFollowNodeTaskRandomTests extends ESTestCase {
@ -54,6 +55,11 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
ShardFollowNodeTask.Status status = task.getStatus(); ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.leaderGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint)); assertThat(status.leaderGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint));
assertThat(status.followerGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint)); assertThat(status.followerGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint));
final long numberOfFailedFetches =
testRun.responses.values().stream().flatMap(List::stream).filter(f -> f.exception != null).count();
assertThat(status.numberOfFailedFetches(), equalTo(numberOfFailedFetches));
// the failures were able to be retried so fetch failures should have cleared
assertThat(status.fetchExceptions().entrySet(), hasSize(0));
assertThat(status.indexMetadataVersion(), equalTo(testRun.finalIndexMetaDataVerion)); assertThat(status.indexMetadataVersion(), equalTo(testRun.finalIndexMetaDataVerion));
}); });

View File

@ -6,11 +6,20 @@
package org.elasticsearch.xpack.ccr.action; package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<ShardFollowNodeTask.Status> { public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<ShardFollowNodeTask.Status> {
@ -21,6 +30,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
@Override @Override
protected ShardFollowNodeTask.Status createTestInstance() { protected ShardFollowNodeTask.Status createTestInstance() {
// if you change this constructor, reflect the changes in the hand-written assertions below
return new ShardFollowNodeTask.Status( return new ShardFollowNodeTask.Status(
randomInt(), randomInt(),
randomNonNegativeLong(), randomNonNegativeLong(),
@ -40,7 +50,57 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong()); randomNonNegativeLong(),
randomReadExceptions());
}
@Override
protected void assertEqualInstances(final ShardFollowNodeTask.Status expectedInstance, final ShardFollowNodeTask.Status newInstance) {
assertNotSame(expectedInstance, newInstance);
assertThat(newInstance.getShardId(), equalTo(expectedInstance.getShardId()));
assertThat(newInstance.leaderGlobalCheckpoint(), equalTo(expectedInstance.leaderGlobalCheckpoint()));
assertThat(newInstance.leaderMaxSeqNo(), equalTo(expectedInstance.leaderMaxSeqNo()));
assertThat(newInstance.followerGlobalCheckpoint(), equalTo(expectedInstance.followerGlobalCheckpoint()));
assertThat(newInstance.lastRequestedSeqNo(), equalTo(expectedInstance.lastRequestedSeqNo()));
assertThat(newInstance.numberOfConcurrentReads(), equalTo(expectedInstance.numberOfConcurrentReads()));
assertThat(newInstance.numberOfConcurrentWrites(), equalTo(expectedInstance.numberOfConcurrentWrites()));
assertThat(newInstance.numberOfQueuedWrites(), equalTo(expectedInstance.numberOfQueuedWrites()));
assertThat(newInstance.indexMetadataVersion(), equalTo(expectedInstance.indexMetadataVersion()));
assertThat(newInstance.totalFetchTimeMillis(), equalTo(expectedInstance.totalFetchTimeMillis()));
assertThat(newInstance.numberOfSuccessfulFetches(), equalTo(expectedInstance.numberOfSuccessfulFetches()));
assertThat(newInstance.numberOfFailedFetches(), equalTo(expectedInstance.numberOfFailedFetches()));
assertThat(newInstance.operationsReceived(), equalTo(expectedInstance.operationsReceived()));
assertThat(newInstance.totalTransferredBytes(), equalTo(expectedInstance.totalTransferredBytes()));
assertThat(newInstance.totalIndexTimeMillis(), equalTo(expectedInstance.totalIndexTimeMillis()));
assertThat(newInstance.numberOfSuccessfulBulkOperations(), equalTo(expectedInstance.numberOfSuccessfulBulkOperations()));
assertThat(newInstance.numberOfFailedBulkOperations(), equalTo(expectedInstance.numberOfFailedBulkOperations()));
assertThat(newInstance.numberOfOperationsIndexed(), equalTo(expectedInstance.numberOfOperationsIndexed()));
assertThat(newInstance.fetchExceptions().size(), equalTo(expectedInstance.fetchExceptions().size()));
assertThat(newInstance.fetchExceptions().keySet(), equalTo(expectedInstance.fetchExceptions().keySet()));
for (final Map.Entry<Long, ElasticsearchException> entry : newInstance.fetchExceptions().entrySet()) {
// x-content loses the exception
final ElasticsearchException expected = expectedInstance.fetchExceptions().get(entry.getKey());
assertThat(entry.getValue().getMessage(), containsString(expected.getMessage()));
assertNotNull(entry.getValue().getCause());
assertThat(
entry.getValue().getCause(),
anyOf(instanceOf(ElasticsearchException.class), instanceOf(IllegalStateException.class)));
assertThat(entry.getValue().getCause().getMessage(), containsString(expected.getCause().getMessage()));
}
}
@Override
protected boolean assertToXContentEquivalence() {
return false;
}
private NavigableMap<Long, ElasticsearchException> randomReadExceptions() {
final int count = randomIntBetween(0, 16);
final NavigableMap<Long, ElasticsearchException> readExceptions = new TreeMap<>();
for (int i = 0; i < count; i++) {
readExceptions.put(randomNonNegativeLong(), new ElasticsearchException(new IllegalStateException("index [" + i + "]")));
}
return readExceptions;
} }
@Override @Override

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ccr.action; package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -20,8 +21,10 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.LongConsumer; import java.util.function.LongConsumer;
@ -29,6 +32,8 @@ import java.util.function.LongConsumer;
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
@ -39,11 +44,17 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
private List<List<Translog.Operation>> bulkShardOperationRequests; private List<List<Translog.Operation>> bulkShardOperationRequests;
private BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> task.run(); private BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> task.run();
private Consumer<ShardFollowNodeTask.Status> beforeSendShardChangesRequest = status -> {};
private AtomicBoolean simulateResponse = new AtomicBoolean();
private Queue<Exception> readFailures; private Queue<Exception> readFailures;
private Queue<Exception> writeFailures; private Queue<Exception> writeFailures;
private Queue<Exception> mappingUpdateFailures; private Queue<Exception> mappingUpdateFailures;
private Queue<Long> imdVersions; private Queue<Long> imdVersions;
private Queue<Long> leaderGlobalCheckpoints;
private Queue<Long> followerGlobalCheckpoints; private Queue<Long> followerGlobalCheckpoints;
private Queue<Long> maxSeqNos;
public void testCoordinateReads() { public void testCoordinateReads() {
ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE); ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE);
@ -169,6 +180,27 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
for (int i = 0; i < max; i++) { for (int i = 0; i < max; i++) {
readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
} }
imdVersions.add(1L);
leaderGlobalCheckpoints.add(63L);
maxSeqNos.add(63L);
simulateResponse.set(true);
final AtomicLong retryCounter = new AtomicLong();
// before each retry, we assert the fetch failures; after the last retry, the fetch failure should clear
beforeSendShardChangesRequest = status -> {
assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get()));
if (retryCounter.get() > 0) {
assertThat(status.fetchExceptions().entrySet(), hasSize(1));
final Map.Entry<Long, ElasticsearchException> entry = status.fetchExceptions().entrySet().iterator().next();
assertThat(entry.getKey(), equalTo(0L));
assertThat(entry.getValue(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().getCause());
assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class));
final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause();
assertThat(cause.getShardId().getIndexName(), equalTo("leader_index"));
assertThat(cause.getShardId().getId(), equalTo(0));
}
retryCounter.incrementAndGet();
};
task.coordinateReads(); task.coordinateReads();
// NUmber of requests is equal to initial request + retried attempts // NUmber of requests is equal to initial request + retried attempts
@ -178,10 +210,14 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequest[1], equalTo(64L)); assertThat(shardChangesRequest[1], equalTo(64L));
} }
assertThat(task.isStopped(), equalTo(false)); assertFalse("task is not stopped", task.isStopped());
ShardFollowNodeTask.Status status = task.getStatus(); ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0)); assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.numberOfFailedFetches(), equalTo((long)max));
assertThat(status.numberOfSuccessfulFetches(), equalTo(1L));
// the fetch failure has cleared
assertThat(status.fetchExceptions().entrySet(), hasSize(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L)); assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
} }
@ -194,6 +230,23 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
for (int i = 0; i < max; i++) { for (int i = 0; i < max; i++) {
readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
} }
final AtomicLong retryCounter = new AtomicLong();
// before each retry, we assert the fetch failures; after the last retry, the fetch failure should persist
beforeSendShardChangesRequest = status -> {
assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get()));
if (retryCounter.get() > 0) {
assertThat(status.fetchExceptions().entrySet(), hasSize(1));
final Map.Entry<Long, ElasticsearchException> entry = status.fetchExceptions().entrySet().iterator().next();
assertThat(entry.getKey(), equalTo(0L));
assertThat(entry.getValue(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().getCause());
assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class));
final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause();
assertThat(cause.getShardId().getIndexName(), equalTo("leader_index"));
assertThat(cause.getShardId().getId(), equalTo(0));
}
retryCounter.incrementAndGet();
};
task.coordinateReads(); task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(11)); assertThat(shardChangesRequests.size(), equalTo(11));
@ -202,12 +255,22 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequest[1], equalTo(64L)); assertThat(shardChangesRequest[1], equalTo(64L));
} }
assertThat(task.isStopped(), equalTo(true)); assertTrue("task is stopped", task.isStopped());
assertThat(fatalError, notNullValue()); assertThat(fatalError, notNullValue());
assertThat(fatalError.getMessage(), containsString("retrying failed [")); assertThat(fatalError.getMessage(), containsString("retrying failed ["));
ShardFollowNodeTask.Status status = task.getStatus(); ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0)); assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.numberOfFailedFetches(), equalTo(11L));
assertThat(status.fetchExceptions().entrySet(), hasSize(1));
final Map.Entry<Long, ElasticsearchException> entry = status.fetchExceptions().entrySet().iterator().next();
assertThat(entry.getKey(), equalTo(0L));
assertThat(entry.getValue(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().getCause());
assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class));
final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause();
assertThat(cause.getShardId().getIndexName(), equalTo("leader_index"));
assertThat(cause.getShardId().getId(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L)); assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
} }
@ -216,19 +279,38 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1); startTask(task, 63, -1);
Exception failure = new RuntimeException(); Exception failure = new RuntimeException("replication failed");
readFailures.add(failure); readFailures.add(failure);
final AtomicBoolean invoked = new AtomicBoolean();
// since there will be only one failure, this should only be invoked once and there should not be a fetch failure
beforeSendShardChangesRequest = status -> {
if (invoked.compareAndSet(false, true)) {
assertThat(status.numberOfFailedFetches(), equalTo(0L));
assertThat(status.fetchExceptions().entrySet(), hasSize(0));
} else {
fail("invoked twice");
}
};
task.coordinateReads(); task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
assertThat(task.isStopped(), equalTo(true)); assertTrue("task is stopped", task.isStopped());
assertThat(fatalError, sameInstance(failure)); assertThat(fatalError, sameInstance(failure));
ShardFollowNodeTask.Status status = task.getStatus(); ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0)); assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.numberOfFailedFetches(), equalTo(1L));
assertThat(status.fetchExceptions().entrySet(), hasSize(1));
final Map.Entry<Long, ElasticsearchException> entry = status.fetchExceptions().entrySet().iterator().next();
assertThat(entry.getKey(), equalTo(0L));
assertThat(entry.getValue(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().getCause());
assertThat(entry.getValue().getCause(), instanceOf(RuntimeException.class));
final RuntimeException cause = (RuntimeException) entry.getValue().getCause();
assertThat(cause.getMessage(), equalTo("replication failed"));
assertThat(status.lastRequestedSeqNo(), equalTo(63L)); assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
} }
@ -642,7 +724,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
writeFailures = new LinkedList<>(); writeFailures = new LinkedList<>();
mappingUpdateFailures = new LinkedList<>(); mappingUpdateFailures = new LinkedList<>();
imdVersions = new LinkedList<>(); imdVersions = new LinkedList<>();
leaderGlobalCheckpoints = new LinkedList<>();
followerGlobalCheckpoints = new LinkedList<>(); followerGlobalCheckpoints = new LinkedList<>();
maxSeqNos = new LinkedList<>();
return new ShardFollowNodeTask( return new ShardFollowNodeTask(
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) { 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) {
@ -683,10 +767,23 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
@Override @Override
protected void innerSendShardChangesRequest(long from, int requestBatchSize, Consumer<ShardChangesAction.Response> handler, protected void innerSendShardChangesRequest(long from, int requestBatchSize, Consumer<ShardChangesAction.Response> handler,
Consumer<Exception> errorHandler) { Consumer<Exception> errorHandler) {
beforeSendShardChangesRequest.accept(getStatus());
shardChangesRequests.add(new long[]{from, requestBatchSize}); shardChangesRequests.add(new long[]{from, requestBatchSize});
Exception readFailure = ShardFollowNodeTaskTests.this.readFailures.poll(); Exception readFailure = ShardFollowNodeTaskTests.this.readFailures.poll();
if (readFailure != null) { if (readFailure != null) {
errorHandler.accept(readFailure); errorHandler.accept(readFailure);
} else if (simulateResponse.get()) {
final Translog.Operation[] operations = new Translog.Operation[requestBatchSize];
for (int i = 0; i < requestBatchSize; i++) {
operations[i] = new Translog.NoOp(from + i, 0, "test");
}
final ShardChangesAction.Response response =
new ShardChangesAction.Response(
imdVersions.poll(),
leaderGlobalCheckpoints.poll(),
maxSeqNos.poll(),
operations);
handler.accept(response);
} }
} }

View File

@ -43,6 +43,7 @@
- match: { bar.0.number_of_successful_bulk_operations: 0 } - match: { bar.0.number_of_successful_bulk_operations: 0 }
- match: { bar.0.number_of_failed_bulk_operations: 0 } - match: { bar.0.number_of_failed_bulk_operations: 0 }
- match: { bar.0.number_of_operations_indexed: 0 } - match: { bar.0.number_of_operations_indexed: 0 }
- length: { bar.0.fetch_exceptions: 0 }
- do: - do:
ccr.unfollow_index: ccr.unfollow_index: