Add follower index to CCR monitoring and status (#33645)

This commit adds the follower index to CCR shard follow task status, and
to monitoring.
This commit is contained in:
Jason Tedor 2018-09-12 17:35:06 -04:00 committed by GitHub
parent b5d8495789
commit eb715d5290
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 47 additions and 27 deletions

View File

@ -82,7 +82,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex); createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex);
assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs)); assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
assertThat(countCcrNodeTasks(), equalTo(1)); assertThat(countCcrNodeTasks(), equalTo(1));
assertBusy(() -> verifyCcrMonitoring(allowedIndex)); assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow"))); assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
// Make sure that there are no other ccr relates operations running: // Make sure that there are no other ccr relates operations running:
assertBusy(() -> { assertBusy(() -> {
@ -206,7 +206,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode(); return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
} }
private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException { private static void verifyCcrMonitoring(String expectedLeaderIndex, String expectedFollowerIndex) throws IOException {
ensureYellow(".monitoring-*"); ensureYellow(".monitoring-*");
Request request = new Request("GET", "/.monitoring-*/_search"); Request request = new Request("GET", "/.monitoring-*/_search");
@ -222,7 +222,10 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
for (int i = 0; i < hits.size(); i++) { for (int i = 0; i < hits.size(); i++) {
Map<?, ?> hit = (Map<?, ?>) hits.get(i); Map<?, ?> hit = (Map<?, ?>) hits.get(i);
String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit); String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit);
assertThat(leaderIndex, endsWith(leaderIndex)); assertThat(leaderIndex, endsWith(expectedLeaderIndex));
final String followerIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.follower_index", hit);
assertThat(followerIndex, equalTo(expectedFollowerIndex));
int foundNumberOfOperationsReceived = int foundNumberOfOperationsReceived =
(int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit); (int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit);

View File

@ -77,7 +77,7 @@ public class FollowIndexIT extends ESRestTestCase {
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true"); index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true");
} }
assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3)); assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3));
assertBusy(() -> verifyCcrMonitoring(leaderIndexName)); assertBusy(() -> verifyCcrMonitoring(leaderIndexName, followIndexName));
} }
} }
@ -107,7 +107,7 @@ public class FollowIndexIT extends ESRestTestCase {
ensureYellow("logs-20190101"); ensureYellow("logs-20190101");
verifyDocuments("logs-20190101", 5); verifyDocuments("logs-20190101", 5);
}); });
assertBusy(() -> verifyCcrMonitoring("logs-20190101")); assertBusy(() -> verifyCcrMonitoring("logs-20190101", "logs-20190101"));
} }
private static void index(RestClient client, String index, String id, Object... fields) throws IOException { private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
@ -159,7 +159,7 @@ public class FollowIndexIT extends ESRestTestCase {
} }
} }
private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException { private static void verifyCcrMonitoring(final String expectedLeaderIndex, final String expectedFollowerIndex) throws IOException {
ensureYellow(".monitoring-*"); ensureYellow(".monitoring-*");
Request request = new Request("GET", "/.monitoring-*/_search"); Request request = new Request("GET", "/.monitoring-*/_search");
@ -175,7 +175,10 @@ public class FollowIndexIT extends ESRestTestCase {
for (int i = 0; i < hits.size(); i++) { for (int i = 0; i < hits.size(); i++) {
Map<?, ?> hit = (Map<?, ?>) hits.get(i); Map<?, ?> hit = (Map<?, ?>) hits.get(i);
String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit); String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit);
assertThat(leaderIndex, endsWith(leaderIndex)); assertThat(leaderIndex, endsWith(expectedLeaderIndex));
final String followerIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.follower_index", hit);
assertThat(followerIndex, equalTo(expectedFollowerIndex));
int foundNumberOfOperationsReceived = int foundNumberOfOperationsReceived =
(int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit); (int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit);

View File

@ -418,6 +418,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
} }
return new ShardFollowNodeTaskStatus( return new ShardFollowNodeTaskStatus(
leaderIndex, leaderIndex,
params.getFollowShardId().getIndexName(),
getFollowShardId().getId(), getFollowShardId().getId(),
leaderGlobalCheckpoint, leaderGlobalCheckpoint,
leaderMaxSeqNo, leaderMaxSeqNo,

View File

@ -106,7 +106,7 @@ public class TransportCcrStatsAction extends TransportTasksAction<
final CcrStatsAction.StatsRequest request, final CcrStatsAction.StatsRequest request,
final ShardFollowNodeTask task, final ShardFollowNodeTask task,
final ActionListener<CcrStatsAction.StatsResponse> listener) { final ActionListener<CcrStatsAction.StatsResponse> listener) {
listener.onResponse(new CcrStatsAction.StatsResponse(task.getFollowShardId(), task.getStatus())); listener.onResponse(new CcrStatsAction.StatsResponse(task.getStatus()));
} }
} }

View File

@ -33,6 +33,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
protected ShardFollowNodeTaskStatus createTestInstance() { protected ShardFollowNodeTaskStatus createTestInstance() {
// if you change this constructor, reflect the changes in the hand-written assertions below // if you change this constructor, reflect the changes in the hand-written assertions below
return new ShardFollowNodeTaskStatus( return new ShardFollowNodeTaskStatus(
randomAlphaOfLength(4),
randomAlphaOfLength(4), randomAlphaOfLength(4),
randomInt(), randomInt(),
randomNonNegativeLong(), randomNonNegativeLong(),
@ -61,6 +62,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
protected void assertEqualInstances(final ShardFollowNodeTaskStatus expectedInstance, final ShardFollowNodeTaskStatus newInstance) { protected void assertEqualInstances(final ShardFollowNodeTaskStatus expectedInstance, final ShardFollowNodeTaskStatus newInstance) {
assertNotSame(expectedInstance, newInstance); assertNotSame(expectedInstance, newInstance);
assertThat(newInstance.leaderIndex(), equalTo(expectedInstance.leaderIndex())); assertThat(newInstance.leaderIndex(), equalTo(expectedInstance.leaderIndex()));
assertThat(newInstance.followerIndex(), equalTo(expectedInstance.followerIndex()));
assertThat(newInstance.getShardId(), equalTo(expectedInstance.getShardId())); assertThat(newInstance.getShardId(), equalTo(expectedInstance.getShardId()));
assertThat(newInstance.leaderGlobalCheckpoint(), equalTo(expectedInstance.leaderGlobalCheckpoint())); assertThat(newInstance.leaderGlobalCheckpoint(), equalTo(expectedInstance.leaderGlobalCheckpoint()));
assertThat(newInstance.leaderMaxSeqNo(), equalTo(expectedInstance.leaderMaxSeqNo())); assertThat(newInstance.leaderMaxSeqNo(), equalTo(expectedInstance.leaderMaxSeqNo()));

View File

@ -34,6 +34,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status"; public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status";
private static final ParseField LEADER_INDEX = new ParseField("leader_index"); private static final ParseField LEADER_INDEX = new ParseField("leader_index");
private static final ParseField FOLLOWER_INDEX = new ParseField("follower_index");
private static final ParseField SHARD_ID = new ParseField("shard_id"); private static final ParseField SHARD_ID = new ParseField("shard_id");
private static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint"); private static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint");
private static final ParseField LEADER_MAX_SEQ_NO_FIELD = new ParseField("leader_max_seq_no"); private static final ParseField LEADER_MAX_SEQ_NO_FIELD = new ParseField("leader_max_seq_no");
@ -62,16 +63,16 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
STATUS_PARSER_NAME, STATUS_PARSER_NAME,
args -> new ShardFollowNodeTaskStatus( args -> new ShardFollowNodeTaskStatus(
(String) args[0], (String) args[0],
(int) args[1], (String) args[1],
(long) args[2], (int) args[2],
(long) args[3], (long) args[3],
(long) args[4], (long) args[4],
(long) args[5], (long) args[5],
(long) args[6], (long) args[6],
(int) args[7], (long) args[7],
(int) args[8], (int) args[8],
(int) args[9], (int) args[9],
(long) args[10], (int) args[10],
(long) args[11], (long) args[11],
(long) args[12], (long) args[12],
(long) args[13], (long) args[13],
@ -81,11 +82,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
(long) args[17], (long) args[17],
(long) args[18], (long) args[18],
(long) args[19], (long) args[19],
(long) args[20],
new TreeMap<>( new TreeMap<>(
((List<Map.Entry<Long, ElasticsearchException>>) args[20]) ((List<Map.Entry<Long, ElasticsearchException>>) args[21])
.stream() .stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
(long) args[21])); (long) args[22]));
public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry";
@ -96,6 +98,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
static { static {
STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX); STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX);
STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOWER_INDEX);
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD);
@ -136,6 +139,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
return leaderIndex; return leaderIndex;
} }
private final String followerIndex;
public String followerIndex() {
return followerIndex;
}
private final int shardId; private final int shardId;
public int getShardId() { public int getShardId() {
@ -264,6 +273,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
public ShardFollowNodeTaskStatus( public ShardFollowNodeTaskStatus(
final String leaderIndex, final String leaderIndex,
final String followerIndex,
final int shardId, final int shardId,
final long leaderGlobalCheckpoint, final long leaderGlobalCheckpoint,
final long leaderMaxSeqNo, final long leaderMaxSeqNo,
@ -286,6 +296,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
final NavigableMap<Long, ElasticsearchException> fetchExceptions, final NavigableMap<Long, ElasticsearchException> fetchExceptions,
final long timeSinceLastFetchMillis) { final long timeSinceLastFetchMillis) {
this.leaderIndex = leaderIndex; this.leaderIndex = leaderIndex;
this.followerIndex = followerIndex;
this.shardId = shardId; this.shardId = shardId;
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
this.leaderMaxSeqNo = leaderMaxSeqNo; this.leaderMaxSeqNo = leaderMaxSeqNo;
@ -311,6 +322,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException { public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException {
this.leaderIndex = in.readString(); this.leaderIndex = in.readString();
this.followerIndex = in.readString();
this.shardId = in.readVInt(); this.shardId = in.readVInt();
this.leaderGlobalCheckpoint = in.readZLong(); this.leaderGlobalCheckpoint = in.readZLong();
this.leaderMaxSeqNo = in.readZLong(); this.leaderMaxSeqNo = in.readZLong();
@ -342,6 +354,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
@Override @Override
public void writeTo(final StreamOutput out) throws IOException { public void writeTo(final StreamOutput out) throws IOException {
out.writeString(leaderIndex); out.writeString(leaderIndex);
out.writeString(followerIndex);
out.writeVInt(shardId); out.writeVInt(shardId);
out.writeZLong(leaderGlobalCheckpoint); out.writeZLong(leaderGlobalCheckpoint);
out.writeZLong(leaderMaxSeqNo); out.writeZLong(leaderMaxSeqNo);
@ -377,6 +390,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
public XContentBuilder toXContentFragment(final XContentBuilder builder, final Params params) throws IOException { public XContentBuilder toXContentFragment(final XContentBuilder builder, final Params params) throws IOException {
builder.field(LEADER_INDEX.getPreferredName(), leaderIndex); builder.field(LEADER_INDEX.getPreferredName(), leaderIndex);
builder.field(FOLLOWER_INDEX.getPreferredName(), followerIndex);
builder.field(SHARD_ID.getPreferredName(), shardId); builder.field(SHARD_ID.getPreferredName(), shardId);
builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint); builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint);
builder.field(LEADER_MAX_SEQ_NO_FIELD.getPreferredName(), leaderMaxSeqNo); builder.field(LEADER_MAX_SEQ_NO_FIELD.getPreferredName(), leaderMaxSeqNo);
@ -439,6 +453,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
final ShardFollowNodeTaskStatus that = (ShardFollowNodeTaskStatus) o; final ShardFollowNodeTaskStatus that = (ShardFollowNodeTaskStatus) o;
return leaderIndex.equals(that.leaderIndex) && return leaderIndex.equals(that.leaderIndex) &&
followerIndex.equals(that.followerIndex) &&
shardId == that.shardId && shardId == that.shardId &&
leaderGlobalCheckpoint == that.leaderGlobalCheckpoint && leaderGlobalCheckpoint == that.leaderGlobalCheckpoint &&
leaderMaxSeqNo == that.leaderMaxSeqNo && leaderMaxSeqNo == that.leaderMaxSeqNo &&
@ -471,6 +486,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
public int hashCode() { public int hashCode() {
return Objects.hash( return Objects.hash(
leaderIndex, leaderIndex,
followerIndex,
shardId, shardId,
leaderGlobalCheckpoint, leaderGlobalCheckpoint,
leaderMaxSeqNo, leaderMaxSeqNo,

View File

@ -19,7 +19,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
@ -70,8 +69,8 @@ public class CcrStatsAction extends Action<CcrStatsAction.StatsResponses> {
final Map<String, Map<Integer, StatsResponse>> taskResponsesByIndex = new TreeMap<>(); final Map<String, Map<Integer, StatsResponse>> taskResponsesByIndex = new TreeMap<>();
for (final StatsResponse statsResponse : statsResponse) { for (final StatsResponse statsResponse : statsResponse) {
taskResponsesByIndex.computeIfAbsent( taskResponsesByIndex.computeIfAbsent(
statsResponse.followerShardId().getIndexName(), statsResponse.status().followerIndex(),
k -> new TreeMap<>()).put(statsResponse.followerShardId().getId(), statsResponse); k -> new TreeMap<>()).put(statsResponse.status().getShardId(), statsResponse);
} }
builder.startObject(); builder.startObject();
{ {
@ -150,31 +149,22 @@ public class CcrStatsAction extends Action<CcrStatsAction.StatsResponses> {
public static class StatsResponse implements Writeable { public static class StatsResponse implements Writeable {
private final ShardId followerShardId;
public ShardId followerShardId() {
return followerShardId;
}
private final ShardFollowNodeTaskStatus status; private final ShardFollowNodeTaskStatus status;
public ShardFollowNodeTaskStatus status() { public ShardFollowNodeTaskStatus status() {
return status; return status;
} }
public StatsResponse(final ShardId followerShardId, final ShardFollowNodeTaskStatus status) { public StatsResponse(final ShardFollowNodeTaskStatus status) {
this.followerShardId = followerShardId;
this.status = status; this.status = status;
} }
public StatsResponse(final StreamInput in) throws IOException { public StatsResponse(final StreamInput in) throws IOException {
this.followerShardId = ShardId.readShardId(in);
this.status = new ShardFollowNodeTaskStatus(in); this.status = new ShardFollowNodeTaskStatus(in);
} }
@Override @Override
public void writeTo(final StreamOutput out) throws IOException { public void writeTo(final StreamOutput out) throws IOException {
followerShardId.writeTo(out);
status.writeTo(out); status.writeTo(out);
} }

View File

@ -922,6 +922,9 @@
"leader_index": { "leader_index": {
"type": "keyword" "type": "keyword"
}, },
"follower_index": {
"type": "keyword"
},
"shard_id": { "shard_id": {
"type": "integer" "type": "integer"
}, },

View File

@ -98,6 +98,7 @@ public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase<CcrSta
final long timeSinceLastFetchMillis = randomNonNegativeLong(); final long timeSinceLastFetchMillis = randomNonNegativeLong();
final ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus( final ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus(
"cluster_alias:leader_index", "cluster_alias:leader_index",
"follower_index",
shardId, shardId,
leaderGlobalCheckpoint, leaderGlobalCheckpoint,
leaderMaxSeqNo, leaderMaxSeqNo,
@ -139,6 +140,7 @@ public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase<CcrSta
+ "}," + "},"
+ "\"ccr_stats\":{" + "\"ccr_stats\":{"
+ "\"leader_index\":\"cluster_alias:leader_index\"," + "\"leader_index\":\"cluster_alias:leader_index\","
+ "\"follower_index\":\"follower_index\","
+ "\"shard_id\":" + shardId + "," + "\"shard_id\":" + shardId + ","
+ "\"leader_global_checkpoint\":" + leaderGlobalCheckpoint + "," + "\"leader_global_checkpoint\":" + leaderGlobalCheckpoint + ","
+ "\"leader_max_seq_no\":" + leaderMaxSeqNo + "," + "\"leader_max_seq_no\":" + leaderMaxSeqNo + ","