diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 80d6ed4cb4a..00e3aaaae2a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -67,6 +67,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class); + private final String leaderIndex; private final ShardFollowTask params; private final TimeValue retryTimeout; private final TimeValue idleShardChangesRequestDelay; @@ -90,6 +91,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private long numberOfSuccessfulBulkOperations = 0; private long numberOfFailedBulkOperations = 0; private long numberOfOperationsIndexed = 0; + private long lastFetchTime = -1; private final Queue buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo)); private final LinkedHashMap fetchExceptions; @@ -112,6 +114,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { return size() > params.getMaxConcurrentReadBatches(); } }; + + if (params.getLeaderClusterAlias() != null) { + leaderIndex = params.getLeaderClusterAlias() + ":" + params.getLeaderShardId().getIndexName(); + } else { + leaderIndex = params.getLeaderShardId().getIndexName(); + } } void start( @@ -235,6 +243,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private void sendShardChangesRequest(long from, int maxOperationCount, long maxRequiredSeqNo, AtomicInteger retryCounter) { final long startTime = relativeTimeProvider.getAsLong(); + synchronized (this) { + lastFetchTime = startTime; + } innerSendShardChangesRequest(from, maxOperationCount, response -> { synchronized (ShardFollowNodeTask.this) { @@ -411,7 +422,15 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { @Override public synchronized Status getStatus() { + final long timeSinceLastFetchMillis; + if (lastFetchTime != -1) { + timeSinceLastFetchMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - lastFetchTime); + } else { + // To avoid confusion when ccr didn't yet execute a fetch: + timeSinceLastFetchMillis = -1; + } return new Status( + leaderIndex, getFollowShardId().getId(), leaderGlobalCheckpoint, leaderMaxSeqNo, @@ -431,13 +450,15 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { numberOfSuccessfulBulkOperations, numberOfFailedBulkOperations, numberOfOperationsIndexed, - new TreeMap<>(fetchExceptions)); + new TreeMap<>(fetchExceptions), + timeSinceLastFetchMillis); } public static class Status implements Task.Status { public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status"; + static final ParseField LEADER_INDEX = new ParseField("leader_index"); static final ParseField SHARD_ID = new ParseField("shard_id"); static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint"); static final ParseField LEADER_MAX_SEQ_NO_FIELD = new ParseField("leader_max_seq_no"); @@ -458,20 +479,21 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { static final ParseField NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD = new ParseField("number_of_failed_bulk_operations"); static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField("number_of_operations_indexed"); static final ParseField FETCH_EXCEPTIONS = new ParseField("fetch_exceptions"); + static final ParseField TIME_SINCE_LAST_FETCH_MILLIS_FIELD = new ParseField("time_since_last_fetch_millis"); @SuppressWarnings("unchecked") static final ConstructingObjectParser STATUS_PARSER = new ConstructingObjectParser<>(STATUS_PARSER_NAME, args -> new Status( - (int) args[0], - (long) args[1], + (String) args[0], + (int) args[1], (long) args[2], (long) args[3], (long) args[4], (long) args[5], - (int) args[6], + (long) args[6], (int) args[7], (int) args[8], - (long) args[9], + (int) args[9], (long) args[10], (long) args[11], (long) args[12], @@ -481,10 +503,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { (long) args[16], (long) args[17], (long) args[18], + (long) args[19], new TreeMap<>( - ((List>) args[19]) + ((List>) args[20]) .stream() - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))); + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), + (long) args[21])); public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; @@ -494,6 +518,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1])); static { + STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD); @@ -514,6 +539,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); STATUS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_PARSER, FETCH_EXCEPTIONS); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIME_SINCE_LAST_FETCH_MILLIS_FIELD); } static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no"); @@ -527,6 +553,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { FETCH_EXCEPTIONS_ENTRY_EXCEPTION); } + private final String leaderIndex; + + public String leaderIndex() { + return leaderIndex; + } + private final int shardId; public int getShardId() { @@ -647,7 +679,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { return fetchExceptions; } + private final long timeSinceLastFetchMillis; + + public long timeSinceLastFetchMillis() { + return timeSinceLastFetchMillis; + } + Status( + final String leaderIndex, final int shardId, final long leaderGlobalCheckpoint, final long leaderMaxSeqNo, @@ -667,7 +706,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { final long numberOfSuccessfulBulkOperations, final long numberOfFailedBulkOperations, final long numberOfOperationsIndexed, - final NavigableMap fetchExceptions) { + final NavigableMap fetchExceptions, + final long timeSinceLastFetchMillis) { + this.leaderIndex = leaderIndex; this.shardId = shardId; this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; this.leaderMaxSeqNo = leaderMaxSeqNo; @@ -688,9 +729,11 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { this.numberOfFailedBulkOperations = numberOfFailedBulkOperations; this.numberOfOperationsIndexed = numberOfOperationsIndexed; this.fetchExceptions = Objects.requireNonNull(fetchExceptions); + this.timeSinceLastFetchMillis = timeSinceLastFetchMillis; } public Status(final StreamInput in) throws IOException { + this.leaderIndex = in.readString(); this.shardId = in.readVInt(); this.leaderGlobalCheckpoint = in.readZLong(); this.leaderMaxSeqNo = in.readZLong(); @@ -711,6 +754,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { this.numberOfFailedBulkOperations = in.readVLong(); this.numberOfOperationsIndexed = in.readVLong(); this.fetchExceptions = new TreeMap<>(in.readMap(StreamInput::readVLong, StreamInput::readException)); + this.timeSinceLastFetchMillis = in.readZLong(); } @Override @@ -720,6 +764,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { @Override public void writeTo(final StreamOutput out) throws IOException { + out.writeString(leaderIndex); out.writeVInt(shardId); out.writeZLong(leaderGlobalCheckpoint); out.writeZLong(leaderMaxSeqNo); @@ -740,12 +785,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { out.writeVLong(numberOfFailedBulkOperations); out.writeVLong(numberOfOperationsIndexed); out.writeMap(fetchExceptions, StreamOutput::writeVLong, StreamOutput::writeException); + out.writeZLong(timeSinceLastFetchMillis); } @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); { + builder.field(LEADER_INDEX.getPreferredName(), leaderIndex); builder.field(SHARD_ID.getPreferredName(), shardId); builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint); builder.field(LEADER_MAX_SEQ_NO_FIELD.getPreferredName(), leaderMaxSeqNo); @@ -791,6 +838,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { } } builder.endArray(); + builder.humanReadableField( + TIME_SINCE_LAST_FETCH_MILLIS_FIELD.getPreferredName(), + "time_since_last_fetch", + new TimeValue(timeSinceLastFetchMillis, TimeUnit.MILLISECONDS)); } builder.endObject(); return builder; @@ -805,7 +856,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final Status that = (Status) o; - return shardId == that.shardId && + return leaderIndex.equals(that.leaderIndex) && + shardId == that.shardId && leaderGlobalCheckpoint == that.leaderGlobalCheckpoint && leaderMaxSeqNo == that.leaderMaxSeqNo && followerGlobalCheckpoint == that.followerGlobalCheckpoint && @@ -829,12 +881,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { * keys. */ fetchExceptions.keySet().equals(that.fetchExceptions.keySet()) && - getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)); + getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)) && + timeSinceLastFetchMillis == that.timeSinceLastFetchMillis; } @Override public int hashCode() { return Objects.hash( + leaderIndex, shardId, leaderGlobalCheckpoint, leaderMaxSeqNo, @@ -858,7 +912,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { * messages. Note that we are relying on the fact that the fetch exceptions are ordered by keys. */ fetchExceptions.keySet(), - getFetchExceptionMessages(this)); + getFetchExceptionMessages(this), + timeSinceLastFetchMillis); } private static List getFetchExceptionMessages(final Status status) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index 234b7334e64..8368a818e00 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -32,6 +32,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase< protected ShardFollowNodeTask.Status createTestInstance() { // if you change this constructor, reflect the changes in the hand-written assertions below return new ShardFollowNodeTask.Status( + randomAlphaOfLength(4), randomInt(), randomNonNegativeLong(), randomNonNegativeLong(), @@ -51,12 +52,14 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase< randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomReadExceptions()); + randomReadExceptions(), + randomLong()); } @Override protected void assertEqualInstances(final ShardFollowNodeTask.Status expectedInstance, final ShardFollowNodeTask.Status newInstance) { assertNotSame(expectedInstance, newInstance); + assertThat(newInstance.leaderIndex(), equalTo(expectedInstance.leaderIndex())); assertThat(newInstance.getShardId(), equalTo(expectedInstance.getShardId())); assertThat(newInstance.leaderGlobalCheckpoint(), equalTo(expectedInstance.leaderGlobalCheckpoint())); assertThat(newInstance.leaderMaxSeqNo(), equalTo(expectedInstance.leaderMaxSeqNo())); @@ -87,6 +90,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase< anyOf(instanceOf(ElasticsearchException.class), instanceOf(IllegalStateException.class))); assertThat(entry.getValue().getCause().getMessage(), containsString(expected.getCause().getMessage())); } + assertThat(newInstance.timeSinceLastFetchMillis(), equalTo(expectedInstance.timeSinceLastFetchMillis())); } @Override diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml index c64cbe7690f..431629b1d23 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml @@ -19,11 +19,15 @@ index: bar body: leader_index: foo + - is_true: follow_index_created + - is_true: follow_index_shards_acked + - is_true: index_following_started # we can not reliably wait for replication to occur so we test the endpoint without indexing any documents - do: ccr.stats: index: bar + - match: { bar.0.leader_index: "foo" } - match: { bar.0.shard_id: 0 } - gte: { bar.0.leader_global_checkpoint: -1 } - gte: { bar.0.leader_max_seq_no: -1 } @@ -44,6 +48,7 @@ - match: { bar.0.number_of_failed_bulk_operations: 0 } - match: { bar.0.number_of_operations_indexed: 0 } - length: { bar.0.fetch_exceptions: 0 } + - gte: { bar.0.time_since_last_fetch_millis: -1 } - do: ccr.unfollow_index: