From ef9607ea0cbc2f928fd233e51ba67b392d7e81bd Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 24 Aug 2018 14:21:23 -0400 Subject: [PATCH] Track fetch exceptions for shard follow tasks (#33047) This commit adds tracking and reporting for fetch exceptions. We track fetch exceptions per fetch, keeping track of up to the maximum number of concurrent fetches. With each failing fetch, we associate the from sequence number with the exception that caused the fetch. We report these in the CCR stats endpoint, and add some testing for this tracking. --- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 9 +- .../xpack/ccr/action/ShardChangesAction.java | 11 ++ .../xpack/ccr/action/ShardFollowNodeTask.java | 124 ++++++++++++++---- .../ShardFollowNodeTaskRandomTests.java | 6 + .../ShardFollowNodeTaskStatusTests.java | 62 ++++++++- .../ccr/action/ShardFollowNodeTaskTests.java | 105 ++++++++++++++- .../rest-api-spec/test/ccr/stats.yml | 1 + 7 files changed, 283 insertions(+), 35 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index d76af9f3c53..b00883f5c2a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -154,7 +154,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E ShardFollowTask::new), // Task statuses - new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.NAME, + new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.STATUS_PARSER_NAME, ShardFollowNodeTask.Status::new) ); } @@ -166,9 +166,10 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E ShardFollowTask::fromXContent), // Task statuses - new NamedXContentRegistry.Entry(ShardFollowNodeTask.Status.class, new ParseField(ShardFollowNodeTask.Status.NAME), - ShardFollowNodeTask.Status::fromXContent) - ); + new NamedXContentRegistry.Entry( + ShardFollowNodeTask.Status.class, + new ParseField(ShardFollowNodeTask.Status.STATUS_PARSER_NAME), + ShardFollowNodeTask.Status::fromXContent)); } /** diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index bc63ba5944e..4eaf71f9c68 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -146,6 +146,17 @@ public class ShardChangesAction extends Action { public int hashCode() { return Objects.hash(fromSeqNo, maxOperationCount, shardId, maxOperationSizeInBytes); } + + @Override + public String toString() { + return "Request{" + + "fromSeqNo=" + fromSeqNo + + ", maxOperationCount=" + maxOperationCount + + ", shardId=" + shardId + + ", maxOperationSizeInBytes=" + maxOperationSizeInBytes + + '}'; + } + } public static final class Response extends ActionResponse { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index cfc8e0fc4e7..f2b5b7b3772 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -30,20 +30,25 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import java.io.IOException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Objects; import java.util.PriorityQueue; import java.util.Queue; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; /** * The node task that fetch the write operations from a leader shard and @@ -86,6 +91,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private long numberOfFailedBulkOperations = 0; private long numberOfOperationsIndexed = 0; private final Queue buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo)); + private final LinkedHashMap fetchExceptions; ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers, ShardFollowTask params, BiConsumer scheduler, final LongSupplier relativeTimeProvider) { @@ -95,6 +101,17 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { this.relativeTimeProvider = relativeTimeProvider; this.retryTimeout = params.getRetryTimeout(); this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay(); + /* + * We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of + * concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry + * when the fetch task associated with that from sequence number succeeds. + */ + this.fetchExceptions = new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(final Map.Entry eldest) { + return size() > params.getMaxConcurrentReadBatches(); + } + }; } void start( @@ -224,6 +241,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { synchronized (ShardFollowNodeTask.this) { totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); numberOfSuccessfulFetches++; + fetchExceptions.remove(from); operationsReceived += response.getOperations().length; totalTransferredBytes += Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); } @@ -233,6 +251,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { synchronized (ShardFollowNodeTask.this) { totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); numberOfFailedFetches++; + fetchExceptions.put(from, new ElasticsearchException(e)); } handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter)); }); @@ -412,12 +431,13 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { totalIndexTimeMillis, numberOfSuccessfulBulkOperations, numberOfFailedBulkOperations, - numberOfOperationsIndexed); + numberOfOperationsIndexed, + new TreeMap<>(fetchExceptions)); } public static class Status implements Task.Status { - public static final String NAME = "shard-follow-node-task-status"; + public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status"; static final ParseField SHARD_ID = new ParseField("shard_id"); static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint"); @@ -438,8 +458,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { static final ParseField NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD = new ParseField("number_of_successful_bulk_operations"); static final ParseField NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD = new ParseField("number_of_failed_bulk_operations"); static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField("number_of_operations_indexed"); + static final ParseField FETCH_EXCEPTIONS = new ParseField("fetch_exceptions"); - static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + @SuppressWarnings("unchecked") + static final ConstructingObjectParser STATUS_PARSER = new ConstructingObjectParser<>(STATUS_PARSER_NAME, args -> new Status( (int) args[0], (long) args[1], @@ -459,28 +481,51 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { (long) args[15], (long) args[16], (long) args[17], - (long) args[18])); + (long) args[18], + new TreeMap<>( + ((List>) args[19]) + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))); + + public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; + + static final ConstructingObjectParser, Void> FETCH_EXCEPTIONS_ENTRY_PARSER = + new ConstructingObjectParser<>( + FETCH_EXCEPTIONS_ENTRY_PARSER_NAME, + args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1])); static { - PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAX_SEQ_NO_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQ_NO_FIELD); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_TRANSFERRED_BYTES); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_INDEX_TIME_MILLIS_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); + STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAX_SEQ_NO_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQ_NO_FIELD); + STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); + STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); + STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_TRANSFERRED_BYTES); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_INDEX_TIME_MILLIS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); + STATUS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_PARSER, FETCH_EXCEPTIONS); + } + + static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no"); + static final ParseField FETCH_EXCEPTIONS_ENTRY_EXCEPTION = new ParseField("exception"); + + static { + FETCH_EXCEPTIONS_ENTRY_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO); + FETCH_EXCEPTIONS_ENTRY_PARSER.declareObject( + ConstructingObjectParser.constructorArg(), + (p, c) -> ElasticsearchException.fromXContent(p), + FETCH_EXCEPTIONS_ENTRY_EXCEPTION); } private final int shardId; @@ -597,6 +642,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { return numberOfOperationsIndexed; } + private final NavigableMap fetchExceptions; + + public NavigableMap fetchExceptions() { + return fetchExceptions; + } + Status( final int shardId, final long leaderGlobalCheckpoint, @@ -616,7 +667,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { final long totalIndexTimeMillis, final long numberOfSuccessfulBulkOperations, final long numberOfFailedBulkOperations, - final long numberOfOperationsIndexed) { + final long numberOfOperationsIndexed, + final NavigableMap fetchExceptions) { this.shardId = shardId; this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; this.leaderMaxSeqNo = leaderMaxSeqNo; @@ -636,6 +688,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { this.numberOfSuccessfulBulkOperations = numberOfSuccessfulBulkOperations; this.numberOfFailedBulkOperations = numberOfFailedBulkOperations; this.numberOfOperationsIndexed = numberOfOperationsIndexed; + this.fetchExceptions = fetchExceptions; } public Status(final StreamInput in) throws IOException { @@ -658,11 +711,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { this.numberOfSuccessfulBulkOperations = in.readVLong(); this.numberOfFailedBulkOperations = in.readVLong(); this.numberOfOperationsIndexed = in.readVLong(); + this.fetchExceptions = new TreeMap<>(in.readMap(StreamInput::readVLong, StreamInput::readException)); } @Override public String getWriteableName() { - return NAME; + return STATUS_PARSER_NAME; } @Override @@ -686,6 +740,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { out.writeVLong(numberOfSuccessfulBulkOperations); out.writeVLong(numberOfFailedBulkOperations); out.writeVLong(numberOfOperationsIndexed); + out.writeMap(fetchExceptions, StreamOutput::writeVLong, StreamOutput::writeException); } @Override @@ -720,13 +775,30 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { builder.field(NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfSuccessfulBulkOperations); builder.field(NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfFailedBulkOperations); builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed); + builder.startArray(FETCH_EXCEPTIONS.getPreferredName()); + { + for (final Map.Entry entry : fetchExceptions.entrySet()) { + builder.startObject(); + { + builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey()); + builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName()); + builder.startObject(); + { + ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue()); + } + builder.endObject(); + } + builder.endObject(); + } + } + builder.endArray(); } builder.endObject(); return builder; } public static Status fromXContent(final XContentParser parser) { - return PARSER.apply(parser, null); + return STATUS_PARSER.apply(parser, null); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index f4cd7a680f4..b96d5b47ec2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -32,6 +32,7 @@ import java.util.function.LongConsumer; import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; public class ShardFollowNodeTaskRandomTests extends ESTestCase { @@ -54,6 +55,11 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase { ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.leaderGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint)); assertThat(status.followerGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint)); + final long numberOfFailedFetches = + testRun.responses.values().stream().flatMap(List::stream).filter(f -> f.exception != null).count(); + assertThat(status.numberOfFailedFetches(), equalTo(numberOfFailedFetches)); + // the failures were able to be retried so fetch failures should have cleared + assertThat(status.fetchExceptions().entrySet(), hasSize(0)); assertThat(status.indexMetadataVersion(), equalTo(testRun.finalIndexMetaDataVerion)); }); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index 6138ba96d54..4eb42830919 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -6,11 +6,20 @@ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase { @@ -21,6 +30,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase< @Override protected ShardFollowNodeTask.Status createTestInstance() { + // if you change this constructor, reflect the changes in the hand-written assertions below return new ShardFollowNodeTask.Status( randomInt(), randomNonNegativeLong(), @@ -40,7 +50,57 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase< randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomNonNegativeLong()); + randomNonNegativeLong(), + randomReadExceptions()); + } + + @Override + protected void assertEqualInstances(final ShardFollowNodeTask.Status expectedInstance, final ShardFollowNodeTask.Status newInstance) { + assertNotSame(expectedInstance, newInstance); + assertThat(newInstance.getShardId(), equalTo(expectedInstance.getShardId())); + assertThat(newInstance.leaderGlobalCheckpoint(), equalTo(expectedInstance.leaderGlobalCheckpoint())); + assertThat(newInstance.leaderMaxSeqNo(), equalTo(expectedInstance.leaderMaxSeqNo())); + assertThat(newInstance.followerGlobalCheckpoint(), equalTo(expectedInstance.followerGlobalCheckpoint())); + assertThat(newInstance.lastRequestedSeqNo(), equalTo(expectedInstance.lastRequestedSeqNo())); + assertThat(newInstance.numberOfConcurrentReads(), equalTo(expectedInstance.numberOfConcurrentReads())); + assertThat(newInstance.numberOfConcurrentWrites(), equalTo(expectedInstance.numberOfConcurrentWrites())); + assertThat(newInstance.numberOfQueuedWrites(), equalTo(expectedInstance.numberOfQueuedWrites())); + assertThat(newInstance.indexMetadataVersion(), equalTo(expectedInstance.indexMetadataVersion())); + assertThat(newInstance.totalFetchTimeMillis(), equalTo(expectedInstance.totalFetchTimeMillis())); + assertThat(newInstance.numberOfSuccessfulFetches(), equalTo(expectedInstance.numberOfSuccessfulFetches())); + assertThat(newInstance.numberOfFailedFetches(), equalTo(expectedInstance.numberOfFailedFetches())); + assertThat(newInstance.operationsReceived(), equalTo(expectedInstance.operationsReceived())); + assertThat(newInstance.totalTransferredBytes(), equalTo(expectedInstance.totalTransferredBytes())); + assertThat(newInstance.totalIndexTimeMillis(), equalTo(expectedInstance.totalIndexTimeMillis())); + assertThat(newInstance.numberOfSuccessfulBulkOperations(), equalTo(expectedInstance.numberOfSuccessfulBulkOperations())); + assertThat(newInstance.numberOfFailedBulkOperations(), equalTo(expectedInstance.numberOfFailedBulkOperations())); + assertThat(newInstance.numberOfOperationsIndexed(), equalTo(expectedInstance.numberOfOperationsIndexed())); + assertThat(newInstance.fetchExceptions().size(), equalTo(expectedInstance.fetchExceptions().size())); + assertThat(newInstance.fetchExceptions().keySet(), equalTo(expectedInstance.fetchExceptions().keySet())); + for (final Map.Entry entry : newInstance.fetchExceptions().entrySet()) { + // x-content loses the exception + final ElasticsearchException expected = expectedInstance.fetchExceptions().get(entry.getKey()); + assertThat(entry.getValue().getMessage(), containsString(expected.getMessage())); + assertNotNull(entry.getValue().getCause()); + assertThat( + entry.getValue().getCause(), + anyOf(instanceOf(ElasticsearchException.class), instanceOf(IllegalStateException.class))); + assertThat(entry.getValue().getCause().getMessage(), containsString(expected.getCause().getMessage())); + } + } + + @Override + protected boolean assertToXContentEquivalence() { + return false; + } + + private NavigableMap randomReadExceptions() { + final int count = randomIntBetween(0, 16); + final NavigableMap readExceptions = new TreeMap<>(); + for (int i = 0; i < count; i++) { + readExceptions.put(randomNonNegativeLong(), new ElasticsearchException(new IllegalStateException("index [" + i + "]"))); + } + return readExceptions; } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 9eda637dc9d..54aef6bd3d1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; @@ -20,8 +21,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; @@ -29,6 +32,8 @@ import java.util.function.LongConsumer; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; @@ -39,11 +44,17 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private List> bulkShardOperationRequests; private BiConsumer scheduler = (delay, task) -> task.run(); + private Consumer beforeSendShardChangesRequest = status -> {}; + + private AtomicBoolean simulateResponse = new AtomicBoolean(); + private Queue readFailures; private Queue writeFailures; private Queue mappingUpdateFailures; private Queue imdVersions; + private Queue leaderGlobalCheckpoints; private Queue followerGlobalCheckpoints; + private Queue maxSeqNos; public void testCoordinateReads() { ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE); @@ -169,6 +180,27 @@ public class ShardFollowNodeTaskTests extends ESTestCase { for (int i = 0; i < max; i++) { readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } + imdVersions.add(1L); + leaderGlobalCheckpoints.add(63L); + maxSeqNos.add(63L); + simulateResponse.set(true); + final AtomicLong retryCounter = new AtomicLong(); + // before each retry, we assert the fetch failures; after the last retry, the fetch failure should clear + beforeSendShardChangesRequest = status -> { + assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get())); + if (retryCounter.get() > 0) { + assertThat(status.fetchExceptions().entrySet(), hasSize(1)); + final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo(0L)); + assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().getCause()); + assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class)); + final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause(); + assertThat(cause.getShardId().getIndexName(), equalTo("leader_index")); + assertThat(cause.getShardId().getId(), equalTo(0)); + } + retryCounter.incrementAndGet(); + }; task.coordinateReads(); // NUmber of requests is equal to initial request + retried attempts @@ -178,10 +210,14 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(shardChangesRequest[1], equalTo(64L)); } - assertThat(task.isStopped(), equalTo(false)); + assertFalse("task is not stopped", task.isStopped()); ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); + assertThat(status.numberOfFailedFetches(), equalTo((long)max)); + assertThat(status.numberOfSuccessfulFetches(), equalTo(1L)); + // the fetch failure has cleared + assertThat(status.fetchExceptions().entrySet(), hasSize(0)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } @@ -194,6 +230,23 @@ public class ShardFollowNodeTaskTests extends ESTestCase { for (int i = 0; i < max; i++) { readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } + final AtomicLong retryCounter = new AtomicLong(); + // before each retry, we assert the fetch failures; after the last retry, the fetch failure should persist + beforeSendShardChangesRequest = status -> { + assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get())); + if (retryCounter.get() > 0) { + assertThat(status.fetchExceptions().entrySet(), hasSize(1)); + final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo(0L)); + assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().getCause()); + assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class)); + final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause(); + assertThat(cause.getShardId().getIndexName(), equalTo("leader_index")); + assertThat(cause.getShardId().getId(), equalTo(0)); + } + retryCounter.incrementAndGet(); + }; task.coordinateReads(); assertThat(shardChangesRequests.size(), equalTo(11)); @@ -202,12 +255,22 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(shardChangesRequest[1], equalTo(64L)); } - assertThat(task.isStopped(), equalTo(true)); + assertTrue("task is stopped", task.isStopped()); assertThat(fatalError, notNullValue()); assertThat(fatalError.getMessage(), containsString("retrying failed [")); ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); + assertThat(status.numberOfFailedFetches(), equalTo(11L)); + assertThat(status.fetchExceptions().entrySet(), hasSize(1)); + final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo(0L)); + assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().getCause()); + assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class)); + final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause(); + assertThat(cause.getShardId().getIndexName(), equalTo("leader_index")); + assertThat(cause.getShardId().getId(), equalTo(0)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } @@ -216,19 +279,38 @@ public class ShardFollowNodeTaskTests extends ESTestCase { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); - Exception failure = new RuntimeException(); + Exception failure = new RuntimeException("replication failed"); readFailures.add(failure); + final AtomicBoolean invoked = new AtomicBoolean(); + // since there will be only one failure, this should only be invoked once and there should not be a fetch failure + beforeSendShardChangesRequest = status -> { + if (invoked.compareAndSet(false, true)) { + assertThat(status.numberOfFailedFetches(), equalTo(0L)); + assertThat(status.fetchExceptions().entrySet(), hasSize(0)); + } else { + fail("invoked twice"); + } + }; task.coordinateReads(); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); - assertThat(task.isStopped(), equalTo(true)); + assertTrue("task is stopped", task.isStopped()); assertThat(fatalError, sameInstance(failure)); ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); + assertThat(status.numberOfFailedFetches(), equalTo(1L)); + assertThat(status.fetchExceptions().entrySet(), hasSize(1)); + final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo(0L)); + assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().getCause()); + assertThat(entry.getValue().getCause(), instanceOf(RuntimeException.class)); + final RuntimeException cause = (RuntimeException) entry.getValue().getCause(); + assertThat(cause.getMessage(), equalTo("replication failed")); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } @@ -642,7 +724,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase { writeFailures = new LinkedList<>(); mappingUpdateFailures = new LinkedList<>(); imdVersions = new LinkedList<>(); + leaderGlobalCheckpoints = new LinkedList<>(); followerGlobalCheckpoints = new LinkedList<>(); + maxSeqNos = new LinkedList<>(); return new ShardFollowNodeTask( 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) { @@ -683,10 +767,23 @@ public class ShardFollowNodeTaskTests extends ESTestCase { @Override protected void innerSendShardChangesRequest(long from, int requestBatchSize, Consumer handler, Consumer errorHandler) { + beforeSendShardChangesRequest.accept(getStatus()); shardChangesRequests.add(new long[]{from, requestBatchSize}); Exception readFailure = ShardFollowNodeTaskTests.this.readFailures.poll(); if (readFailure != null) { errorHandler.accept(readFailure); + } else if (simulateResponse.get()) { + final Translog.Operation[] operations = new Translog.Operation[requestBatchSize]; + for (int i = 0; i < requestBatchSize; i++) { + operations[i] = new Translog.NoOp(from + i, 0, "test"); + } + final ShardChangesAction.Response response = + new ShardChangesAction.Response( + imdVersions.poll(), + leaderGlobalCheckpoints.poll(), + maxSeqNos.poll(), + operations); + handler.accept(response); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml index 94af4c345fd..a38698a45be 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml @@ -43,6 +43,7 @@ - match: { bar.0.number_of_successful_bulk_operations: 0 } - match: { bar.0.number_of_failed_bulk_operations: 0 } - match: { bar.0.number_of_operations_indexed: 0 } + - length: { bar.0.fetch_exceptions: 0 } - do: ccr.unfollow_index: