diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index f88f21e4072..7faebfdc26c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; @@ -36,6 +37,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; /** * The node task that fetch the write operations from a leader shard and @@ -72,7 +74,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private long numberOfOperationsIndexed = 0; private long lastFetchTime = -1; private final Queue buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo)); - private final LinkedHashMap fetchExceptions; + private final LinkedHashMap> fetchExceptions; ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers, ShardFollowTask params, BiConsumer scheduler, final LongSupplier relativeTimeProvider) { @@ -87,9 +89,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { * concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry * when the fetch task associated with that from sequence number succeeds. */ - this.fetchExceptions = new LinkedHashMap() { + this.fetchExceptions = new LinkedHashMap>() { @Override - protected boolean removeEldestEntry(final Map.Entry eldest) { + protected boolean removeEldestEntry(final Map.Entry> eldest) { return size() > params.getMaxConcurrentReadBatches(); } }; @@ -240,7 +242,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { synchronized (ShardFollowNodeTask.this) { totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); numberOfFailedFetches++; - fetchExceptions.put(from, new ElasticsearchException(e)); + fetchExceptions.put(from, Tuple.tuple(retryCounter, new ElasticsearchException(e))); } handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter)); }); @@ -438,7 +440,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { numberOfSuccessfulBulkOperations, numberOfFailedBulkOperations, numberOfOperationsIndexed, - new TreeMap<>(fetchExceptions), + new TreeMap<>( + fetchExceptions + .entrySet() + .stream() + .collect( + Collectors.toMap(Map.Entry::getKey, e -> Tuple.tuple(e.getValue().v1().get(), e.getValue().v2())))), timeSinceLastFetchMillis); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index d5f2ab7ea08..48e4359ccb5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -83,15 +84,17 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase< assertThat(newInstance.numberOfOperationsIndexed(), equalTo(expectedInstance.numberOfOperationsIndexed())); assertThat(newInstance.fetchExceptions().size(), equalTo(expectedInstance.fetchExceptions().size())); assertThat(newInstance.fetchExceptions().keySet(), equalTo(expectedInstance.fetchExceptions().keySet())); - for (final Map.Entry entry : newInstance.fetchExceptions().entrySet()) { + for (final Map.Entry> entry : newInstance.fetchExceptions().entrySet()) { + final Tuple expectedTuple = expectedInstance.fetchExceptions().get(entry.getKey()); + assertThat(entry.getValue().v1(), equalTo(expectedTuple.v1())); // x-content loses the exception - final ElasticsearchException expected = expectedInstance.fetchExceptions().get(entry.getKey()); - assertThat(entry.getValue().getMessage(), containsString(expected.getMessage())); - assertNotNull(entry.getValue().getCause()); + final ElasticsearchException expected = expectedTuple.v2(); + assertThat(entry.getValue().v2().getMessage(), containsString(expected.getMessage())); + assertNotNull(entry.getValue().v2().getCause()); assertThat( - entry.getValue().getCause(), + entry.getValue().v2().getCause(), anyOf(instanceOf(ElasticsearchException.class), instanceOf(IllegalStateException.class))); - assertThat(entry.getValue().getCause().getMessage(), containsString(expected.getCause().getMessage())); + assertThat(entry.getValue().v2().getCause().getMessage(), containsString(expected.getCause().getMessage())); } assertThat(newInstance.timeSinceLastFetchMillis(), equalTo(expectedInstance.timeSinceLastFetchMillis())); } @@ -101,11 +104,15 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase< return false; } - private NavigableMap randomReadExceptions() { + private NavigableMap> randomReadExceptions() { final int count = randomIntBetween(0, 16); - final NavigableMap readExceptions = new TreeMap<>(); + final NavigableMap> readExceptions = new TreeMap<>(); for (int i = 0; i < count; i++) { - readExceptions.put(randomNonNegativeLong(), new ElasticsearchException(new IllegalStateException("index [" + i + "]"))); + readExceptions.put( + randomNonNegativeLong(), + Tuple.tuple( + randomIntBetween(0, Integer.MAX_VALUE), + new ElasticsearchException(new IllegalStateException("index [" + i + "]")))); } return readExceptions; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 101b2580759..71a97bf8207 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -192,12 +193,13 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get())); if (retryCounter.get() > 0) { assertThat(status.fetchExceptions().entrySet(), hasSize(1)); - final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + final Map.Entry> entry = status.fetchExceptions().entrySet().iterator().next(); + assertThat(entry.getValue().v1(), equalTo(Math.toIntExact(retryCounter.get()))); assertThat(entry.getKey(), equalTo(0L)); - assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); - assertNotNull(entry.getValue().getCause()); - assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class)); - final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause(); + assertThat(entry.getValue().v2(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().v2().getCause()); + assertThat(entry.getValue().v2().getCause(), instanceOf(ShardNotFoundException.class)); + final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().v2().getCause(); assertThat(cause.getShardId().getIndexName(), equalTo("leader_index")); assertThat(cause.getShardId().getId(), equalTo(0)); } @@ -253,12 +255,12 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(status.numberOfConcurrentWrites(), equalTo(0)); assertThat(status.numberOfFailedFetches(), equalTo(1L)); assertThat(status.fetchExceptions().entrySet(), hasSize(1)); - final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + final Map.Entry> entry = status.fetchExceptions().entrySet().iterator().next(); assertThat(entry.getKey(), equalTo(0L)); - assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); - assertNotNull(entry.getValue().getCause()); - assertThat(entry.getValue().getCause(), instanceOf(RuntimeException.class)); - final RuntimeException cause = (RuntimeException) entry.getValue().getCause(); + assertThat(entry.getValue().v2(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().v2().getCause()); + assertThat(entry.getValue().v2().getCause(), instanceOf(RuntimeException.class)); + final RuntimeException cause = (RuntimeException) entry.getValue().v2().getCause(); assertThat(cause.getMessage(), equalTo("replication failed")); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java index dafb4a5e29f..a8193c35a8d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java @@ -9,6 +9,7 @@ package org.elasticsearch.xpack.core.ccr; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -84,17 +85,17 @@ public class ShardFollowNodeTaskStatus implements Task.Status { (long) args[19], (long) args[20], new TreeMap<>( - ((List>) args[21]) + ((List>>) args[21]) .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), (long) args[22])); public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; - static final ConstructingObjectParser, Void> FETCH_EXCEPTIONS_ENTRY_PARSER = + static final ConstructingObjectParser>, Void> FETCH_EXCEPTIONS_ENTRY_PARSER = new ConstructingObjectParser<>( FETCH_EXCEPTIONS_ENTRY_PARSER_NAME, - args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1])); + args -> new AbstractMap.SimpleEntry<>((long) args[0], Tuple.tuple((Integer)args[1], (ElasticsearchException)args[2]))); static { STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX); @@ -123,10 +124,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status { } static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no"); + static final ParseField FETCH_EXCEPTIONS_RETRIES = new ParseField("retries"); static final ParseField FETCH_EXCEPTIONS_ENTRY_EXCEPTION = new ParseField("exception"); static { FETCH_EXCEPTIONS_ENTRY_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO); + FETCH_EXCEPTIONS_ENTRY_PARSER.declareInt(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_RETRIES); FETCH_EXCEPTIONS_ENTRY_PARSER.declareObject( ConstructingObjectParser.constructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), @@ -259,9 +262,9 @@ public class ShardFollowNodeTaskStatus implements Task.Status { return numberOfOperationsIndexed; } - private final NavigableMap fetchExceptions; + private final NavigableMap> fetchExceptions; - public NavigableMap fetchExceptions() { + public NavigableMap> fetchExceptions() { return fetchExceptions; } @@ -293,7 +296,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { final long numberOfSuccessfulBulkOperations, final long numberOfFailedBulkOperations, final long numberOfOperationsIndexed, - final NavigableMap fetchExceptions, + final NavigableMap> fetchExceptions, final long timeSinceLastFetchMillis) { this.leaderIndex = leaderIndex; this.followerIndex = followerIndex; @@ -342,7 +345,8 @@ public class ShardFollowNodeTaskStatus implements Task.Status { this.numberOfSuccessfulBulkOperations = in.readVLong(); this.numberOfFailedBulkOperations = in.readVLong(); this.numberOfOperationsIndexed = in.readVLong(); - this.fetchExceptions = new TreeMap<>(in.readMap(StreamInput::readVLong, StreamInput::readException)); + this.fetchExceptions = + new TreeMap<>(in.readMap(StreamInput::readVLong, stream -> Tuple.tuple(stream.readVInt(), stream.readException()))); this.timeSinceLastFetchMillis = in.readZLong(); } @@ -374,7 +378,10 @@ public class ShardFollowNodeTaskStatus implements Task.Status { out.writeVLong(numberOfSuccessfulBulkOperations); out.writeVLong(numberOfFailedBulkOperations); out.writeVLong(numberOfOperationsIndexed); - out.writeMap(fetchExceptions, StreamOutput::writeVLong, StreamOutput::writeException); + out.writeMap( + fetchExceptions, + StreamOutput::writeVLong, + (stream, value) -> { stream.writeVInt(value.v1()); stream.writeException(value.v2()); }); out.writeZLong(timeSinceLastFetchMillis); } @@ -421,14 +428,15 @@ public class ShardFollowNodeTaskStatus implements Task.Status { builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed); builder.startArray(FETCH_EXCEPTIONS.getPreferredName()); { - for (final Map.Entry entry : fetchExceptions.entrySet()) { + for (final Map.Entry> entry : fetchExceptions.entrySet()) { builder.startObject(); { builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey()); + builder.field(FETCH_EXCEPTIONS_RETRIES.getPreferredName(), entry.getValue().v1()); builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName()); builder.startObject(); { - ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue()); + ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue().v2()); } builder.endObject(); } @@ -515,7 +523,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { } private static List getFetchExceptionMessages(final ShardFollowNodeTaskStatus status) { - return status.fetchExceptions().values().stream().map(ElasticsearchException::getMessage).collect(Collectors.toList()); + return status.fetchExceptions().values().stream().map(t -> t.v2().getMessage()).collect(Collectors.toList()); } public String toString() { diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index 83c9fe70e11..444f15912e6 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -987,6 +987,9 @@ "from_seq_no": { "type": "long" }, + "retries": { + "type": "integer" + }, "exception": { "type": "text" } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java index ed893410c88..9124e1d5245 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.xpack.monitoring.collector.ccr; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; @@ -101,8 +102,10 @@ public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase fetchExceptions = - new TreeMap<>(Collections.singletonMap(randomNonNegativeLong(), new ElasticsearchException("shard is sad"))); + final NavigableMap> fetchExceptions = + new TreeMap<>(Collections.singletonMap( + randomNonNegativeLong(), + Tuple.tuple(randomIntBetween(0, Integer.MAX_VALUE), new ElasticsearchException("shard is sad")))); final long timeSinceLastFetchMillis = randomNonNegativeLong(); final ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus( "cluster_alias:leader_index", @@ -171,6 +174,7 @@ public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase fetchExceptions = - new TreeMap<>(Collections.singletonMap(1L, new ElasticsearchException("shard is sad"))); + final NavigableMap> fetchExceptions = + new TreeMap<>(Collections.singletonMap(1L, Tuple.tuple(2, new ElasticsearchException("shard is sad")))); final ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus( "cluster_alias:leader_index", "follower_index", @@ -234,7 +238,9 @@ public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase) fieldMapping.get("properties")).size(), equalTo(3)); assertThat(XContentMapValues.extractValue("properties.from_seq_no.type", fieldMapping), equalTo("long")); + assertThat(XContentMapValues.extractValue("properties.retries.type", fieldMapping), equalTo("integer")); assertThat(XContentMapValues.extractValue("properties.exception.type", fieldMapping), equalTo("text")); } else { fail("unexpected field value type [" + fieldValue.getClass() + "] for field [" + fieldName + "]");