diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index d1a90a6ccec..43b16727aac 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -82,7 +82,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase { createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex); assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs)); assertThat(countCcrNodeTasks(), equalTo(1)); - assertBusy(() -> verifyCcrMonitoring(allowedIndex)); + assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex)); assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow"))); // Make sure that there are no other ccr relates operations running: assertBusy(() -> { @@ -206,7 +206,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase { return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode(); } - private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException { + private static void verifyCcrMonitoring(String expectedLeaderIndex, String expectedFollowerIndex) throws IOException { ensureYellow(".monitoring-*"); Request request = new Request("GET", "/.monitoring-*/_search"); @@ -222,7 +222,10 @@ public class FollowIndexSecurityIT extends ESRestTestCase { for (int i = 0; i < hits.size(); i++) { Map hit = (Map) hits.get(i); String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit); - assertThat(leaderIndex, endsWith(leaderIndex)); + assertThat(leaderIndex, endsWith(expectedLeaderIndex)); + + final String followerIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.follower_index", hit); + assertThat(followerIndex, equalTo(expectedFollowerIndex)); int foundNumberOfOperationsReceived = (int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit); diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index ccb5e409e8c..5c1c3915044 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -77,7 +77,7 @@ public class FollowIndexIT extends ESRestTestCase { index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true"); } assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3)); - assertBusy(() -> verifyCcrMonitoring(leaderIndexName)); + assertBusy(() -> verifyCcrMonitoring(leaderIndexName, followIndexName)); } } @@ -107,7 +107,7 @@ public class FollowIndexIT extends ESRestTestCase { ensureYellow("logs-20190101"); verifyDocuments("logs-20190101", 5); }); - assertBusy(() -> verifyCcrMonitoring("logs-20190101")); + assertBusy(() -> verifyCcrMonitoring("logs-20190101", "logs-20190101")); } private static void index(RestClient client, String index, String id, Object... fields) throws IOException { @@ -159,7 +159,7 @@ public class FollowIndexIT extends ESRestTestCase { } } - private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException { + private static void verifyCcrMonitoring(final String expectedLeaderIndex, final String expectedFollowerIndex) throws IOException { ensureYellow(".monitoring-*"); Request request = new Request("GET", "/.monitoring-*/_search"); @@ -175,7 +175,10 @@ public class FollowIndexIT extends ESRestTestCase { for (int i = 0; i < hits.size(); i++) { Map hit = (Map) hits.get(i); String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit); - assertThat(leaderIndex, endsWith(leaderIndex)); + assertThat(leaderIndex, endsWith(expectedLeaderIndex)); + + final String followerIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.follower_index", hit); + assertThat(followerIndex, equalTo(expectedFollowerIndex)); int foundNumberOfOperationsReceived = (int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index c221c097977..f88f21e4072 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -418,6 +418,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { } return new ShardFollowNodeTaskStatus( leaderIndex, + params.getFollowShardId().getIndexName(), getFollowShardId().getId(), leaderGlobalCheckpoint, leaderMaxSeqNo, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java index f227a56f158..394b42789d1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java @@ -106,7 +106,7 @@ public class TransportCcrStatsAction extends TransportTasksAction< final CcrStatsAction.StatsRequest request, final ShardFollowNodeTask task, final ActionListener listener) { - listener.onResponse(new CcrStatsAction.StatsResponse(task.getFollowShardId(), task.getStatus())); + listener.onResponse(new CcrStatsAction.StatsResponse(task.getStatus())); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index 2f145e7a98c..d5f2ab7ea08 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -33,6 +33,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase< protected ShardFollowNodeTaskStatus createTestInstance() { // if you change this constructor, reflect the changes in the hand-written assertions below return new ShardFollowNodeTaskStatus( + randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(), randomNonNegativeLong(), @@ -61,6 +62,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase< protected void assertEqualInstances(final ShardFollowNodeTaskStatus expectedInstance, final ShardFollowNodeTaskStatus newInstance) { assertNotSame(expectedInstance, newInstance); assertThat(newInstance.leaderIndex(), equalTo(expectedInstance.leaderIndex())); + assertThat(newInstance.followerIndex(), equalTo(expectedInstance.followerIndex())); assertThat(newInstance.getShardId(), equalTo(expectedInstance.getShardId())); assertThat(newInstance.leaderGlobalCheckpoint(), equalTo(expectedInstance.leaderGlobalCheckpoint())); assertThat(newInstance.leaderMaxSeqNo(), equalTo(expectedInstance.leaderMaxSeqNo())); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java index 2f3c4efb9ad..dafb4a5e29f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java @@ -34,6 +34,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status"; private static final ParseField LEADER_INDEX = new ParseField("leader_index"); + private static final ParseField FOLLOWER_INDEX = new ParseField("follower_index"); private static final ParseField SHARD_ID = new ParseField("shard_id"); private static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint"); private static final ParseField LEADER_MAX_SEQ_NO_FIELD = new ParseField("leader_max_seq_no"); @@ -62,16 +63,16 @@ public class ShardFollowNodeTaskStatus implements Task.Status { STATUS_PARSER_NAME, args -> new ShardFollowNodeTaskStatus( (String) args[0], - (int) args[1], - (long) args[2], + (String) args[1], + (int) args[2], (long) args[3], (long) args[4], (long) args[5], (long) args[6], - (int) args[7], + (long) args[7], (int) args[8], (int) args[9], - (long) args[10], + (int) args[10], (long) args[11], (long) args[12], (long) args[13], @@ -81,11 +82,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status { (long) args[17], (long) args[18], (long) args[19], + (long) args[20], new TreeMap<>( - ((List>) args[20]) + ((List>) args[21]) .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), - (long) args[21])); + (long) args[22])); public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; @@ -96,6 +98,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { static { STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX); + STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOWER_INDEX); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD); @@ -136,6 +139,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status { return leaderIndex; } + private final String followerIndex; + + public String followerIndex() { + return followerIndex; + } + private final int shardId; public int getShardId() { @@ -264,6 +273,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { public ShardFollowNodeTaskStatus( final String leaderIndex, + final String followerIndex, final int shardId, final long leaderGlobalCheckpoint, final long leaderMaxSeqNo, @@ -286,6 +296,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { final NavigableMap fetchExceptions, final long timeSinceLastFetchMillis) { this.leaderIndex = leaderIndex; + this.followerIndex = followerIndex; this.shardId = shardId; this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; this.leaderMaxSeqNo = leaderMaxSeqNo; @@ -311,6 +322,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException { this.leaderIndex = in.readString(); + this.followerIndex = in.readString(); this.shardId = in.readVInt(); this.leaderGlobalCheckpoint = in.readZLong(); this.leaderMaxSeqNo = in.readZLong(); @@ -342,6 +354,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { @Override public void writeTo(final StreamOutput out) throws IOException { out.writeString(leaderIndex); + out.writeString(followerIndex); out.writeVInt(shardId); out.writeZLong(leaderGlobalCheckpoint); out.writeZLong(leaderMaxSeqNo); @@ -377,6 +390,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { public XContentBuilder toXContentFragment(final XContentBuilder builder, final Params params) throws IOException { builder.field(LEADER_INDEX.getPreferredName(), leaderIndex); + builder.field(FOLLOWER_INDEX.getPreferredName(), followerIndex); builder.field(SHARD_ID.getPreferredName(), shardId); builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint); builder.field(LEADER_MAX_SEQ_NO_FIELD.getPreferredName(), leaderMaxSeqNo); @@ -439,6 +453,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { if (o == null || getClass() != o.getClass()) return false; final ShardFollowNodeTaskStatus that = (ShardFollowNodeTaskStatus) o; return leaderIndex.equals(that.leaderIndex) && + followerIndex.equals(that.followerIndex) && shardId == that.shardId && leaderGlobalCheckpoint == that.leaderGlobalCheckpoint && leaderMaxSeqNo == that.leaderMaxSeqNo && @@ -471,6 +486,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { public int hashCode() { return Objects.hash( leaderIndex, + followerIndex, shardId, leaderGlobalCheckpoint, leaderMaxSeqNo, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java index 1074b6905d3..863cb678d7e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -70,8 +69,8 @@ public class CcrStatsAction extends Action { final Map> taskResponsesByIndex = new TreeMap<>(); for (final StatsResponse statsResponse : statsResponse) { taskResponsesByIndex.computeIfAbsent( - statsResponse.followerShardId().getIndexName(), - k -> new TreeMap<>()).put(statsResponse.followerShardId().getId(), statsResponse); + statsResponse.status().followerIndex(), + k -> new TreeMap<>()).put(statsResponse.status().getShardId(), statsResponse); } builder.startObject(); { @@ -150,31 +149,22 @@ public class CcrStatsAction extends Action { public static class StatsResponse implements Writeable { - private final ShardId followerShardId; - - public ShardId followerShardId() { - return followerShardId; - } - private final ShardFollowNodeTaskStatus status; public ShardFollowNodeTaskStatus status() { return status; } - public StatsResponse(final ShardId followerShardId, final ShardFollowNodeTaskStatus status) { - this.followerShardId = followerShardId; + public StatsResponse(final ShardFollowNodeTaskStatus status) { this.status = status; } public StatsResponse(final StreamInput in) throws IOException { - this.followerShardId = ShardId.readShardId(in); this.status = new ShardFollowNodeTaskStatus(in); } @Override public void writeTo(final StreamOutput out) throws IOException { - followerShardId.writeTo(out); status.writeTo(out); } diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index 9cca4a6e248..83c9fe70e11 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -922,6 +922,9 @@ "leader_index": { "type": "keyword" }, + "follower_index": { + "type": "keyword" + }, "shard_id": { "type": "integer" }, diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java index 47f2bdf5d2e..70b73e5eed0 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java @@ -98,6 +98,7 @@ public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase