[CCR] Introduce leader index name & last fetch time stats to stats api response (#33155)

This commit is contained in:
Martijn van Groningen 2018-08-29 10:54:58 +07:00 committed by GitHub
parent c42dc77896
commit 41c7fc8d37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 76 additions and 12 deletions

View File

@ -67,6 +67,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class);
private final String leaderIndex;
private final ShardFollowTask params;
private final TimeValue retryTimeout;
private final TimeValue idleShardChangesRequestDelay;
@ -90,6 +91,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private long numberOfSuccessfulBulkOperations = 0;
private long numberOfFailedBulkOperations = 0;
private long numberOfOperationsIndexed = 0;
private long lastFetchTime = -1;
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
private final LinkedHashMap<Long, ElasticsearchException> fetchExceptions;
@ -112,6 +114,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
return size() > params.getMaxConcurrentReadBatches();
}
};
if (params.getLeaderClusterAlias() != null) {
leaderIndex = params.getLeaderClusterAlias() + ":" + params.getLeaderShardId().getIndexName();
} else {
leaderIndex = params.getLeaderShardId().getIndexName();
}
}
void start(
@ -235,6 +243,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private void sendShardChangesRequest(long from, int maxOperationCount, long maxRequiredSeqNo, AtomicInteger retryCounter) {
final long startTime = relativeTimeProvider.getAsLong();
synchronized (this) {
lastFetchTime = startTime;
}
innerSendShardChangesRequest(from, maxOperationCount,
response -> {
synchronized (ShardFollowNodeTask.this) {
@ -411,7 +422,15 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
@Override
public synchronized Status getStatus() {
final long timeSinceLastFetchMillis;
if (lastFetchTime != -1) {
timeSinceLastFetchMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - lastFetchTime);
} else {
// To avoid confusion when ccr didn't yet execute a fetch:
timeSinceLastFetchMillis = -1;
}
return new Status(
leaderIndex,
getFollowShardId().getId(),
leaderGlobalCheckpoint,
leaderMaxSeqNo,
@ -431,13 +450,15 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
numberOfSuccessfulBulkOperations,
numberOfFailedBulkOperations,
numberOfOperationsIndexed,
new TreeMap<>(fetchExceptions));
new TreeMap<>(fetchExceptions),
timeSinceLastFetchMillis);
}
public static class Status implements Task.Status {
public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status";
static final ParseField LEADER_INDEX = new ParseField("leader_index");
static final ParseField SHARD_ID = new ParseField("shard_id");
static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint");
static final ParseField LEADER_MAX_SEQ_NO_FIELD = new ParseField("leader_max_seq_no");
@ -458,20 +479,21 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
static final ParseField NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD = new ParseField("number_of_failed_bulk_operations");
static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField("number_of_operations_indexed");
static final ParseField FETCH_EXCEPTIONS = new ParseField("fetch_exceptions");
static final ParseField TIME_SINCE_LAST_FETCH_MILLIS_FIELD = new ParseField("time_since_last_fetch_millis");
@SuppressWarnings("unchecked")
static final ConstructingObjectParser<Status, Void> STATUS_PARSER = new ConstructingObjectParser<>(STATUS_PARSER_NAME,
args -> new Status(
(int) args[0],
(long) args[1],
(String) args[0],
(int) args[1],
(long) args[2],
(long) args[3],
(long) args[4],
(long) args[5],
(int) args[6],
(long) args[6],
(int) args[7],
(int) args[8],
(long) args[9],
(int) args[9],
(long) args[10],
(long) args[11],
(long) args[12],
@ -481,10 +503,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
(long) args[16],
(long) args[17],
(long) args[18],
(long) args[19],
new TreeMap<>(
((List<Map.Entry<Long, ElasticsearchException>>) args[19])
((List<Map.Entry<Long, ElasticsearchException>>) args[20])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))));
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
(long) args[21]));
public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry";
@ -494,6 +518,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1]));
static {
STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX);
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD);
@ -514,6 +539,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD);
STATUS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_PARSER, FETCH_EXCEPTIONS);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIME_SINCE_LAST_FETCH_MILLIS_FIELD);
}
static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no");
@ -527,6 +553,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
FETCH_EXCEPTIONS_ENTRY_EXCEPTION);
}
private final String leaderIndex;
public String leaderIndex() {
return leaderIndex;
}
private final int shardId;
public int getShardId() {
@ -647,7 +679,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
return fetchExceptions;
}
private final long timeSinceLastFetchMillis;
public long timeSinceLastFetchMillis() {
return timeSinceLastFetchMillis;
}
Status(
final String leaderIndex,
final int shardId,
final long leaderGlobalCheckpoint,
final long leaderMaxSeqNo,
@ -667,7 +706,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
final long numberOfSuccessfulBulkOperations,
final long numberOfFailedBulkOperations,
final long numberOfOperationsIndexed,
final NavigableMap<Long, ElasticsearchException> fetchExceptions) {
final NavigableMap<Long, ElasticsearchException> fetchExceptions,
final long timeSinceLastFetchMillis) {
this.leaderIndex = leaderIndex;
this.shardId = shardId;
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
this.leaderMaxSeqNo = leaderMaxSeqNo;
@ -688,9 +729,11 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.numberOfFailedBulkOperations = numberOfFailedBulkOperations;
this.numberOfOperationsIndexed = numberOfOperationsIndexed;
this.fetchExceptions = Objects.requireNonNull(fetchExceptions);
this.timeSinceLastFetchMillis = timeSinceLastFetchMillis;
}
public Status(final StreamInput in) throws IOException {
this.leaderIndex = in.readString();
this.shardId = in.readVInt();
this.leaderGlobalCheckpoint = in.readZLong();
this.leaderMaxSeqNo = in.readZLong();
@ -711,6 +754,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.numberOfFailedBulkOperations = in.readVLong();
this.numberOfOperationsIndexed = in.readVLong();
this.fetchExceptions = new TreeMap<>(in.readMap(StreamInput::readVLong, StreamInput::readException));
this.timeSinceLastFetchMillis = in.readZLong();
}
@Override
@ -720,6 +764,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeString(leaderIndex);
out.writeVInt(shardId);
out.writeZLong(leaderGlobalCheckpoint);
out.writeZLong(leaderMaxSeqNo);
@ -740,12 +785,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
out.writeVLong(numberOfFailedBulkOperations);
out.writeVLong(numberOfOperationsIndexed);
out.writeMap(fetchExceptions, StreamOutput::writeVLong, StreamOutput::writeException);
out.writeZLong(timeSinceLastFetchMillis);
}
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
{
builder.field(LEADER_INDEX.getPreferredName(), leaderIndex);
builder.field(SHARD_ID.getPreferredName(), shardId);
builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint);
builder.field(LEADER_MAX_SEQ_NO_FIELD.getPreferredName(), leaderMaxSeqNo);
@ -791,6 +838,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
}
}
builder.endArray();
builder.humanReadableField(
TIME_SINCE_LAST_FETCH_MILLIS_FIELD.getPreferredName(),
"time_since_last_fetch",
new TimeValue(timeSinceLastFetchMillis, TimeUnit.MILLISECONDS));
}
builder.endObject();
return builder;
@ -805,7 +856,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final Status that = (Status) o;
return shardId == that.shardId &&
return leaderIndex.equals(that.leaderIndex) &&
shardId == that.shardId &&
leaderGlobalCheckpoint == that.leaderGlobalCheckpoint &&
leaderMaxSeqNo == that.leaderMaxSeqNo &&
followerGlobalCheckpoint == that.followerGlobalCheckpoint &&
@ -829,12 +881,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
* keys.
*/
fetchExceptions.keySet().equals(that.fetchExceptions.keySet()) &&
getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that));
getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)) &&
timeSinceLastFetchMillis == that.timeSinceLastFetchMillis;
}
@Override
public int hashCode() {
return Objects.hash(
leaderIndex,
shardId,
leaderGlobalCheckpoint,
leaderMaxSeqNo,
@ -858,7 +912,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
* messages. Note that we are relying on the fact that the fetch exceptions are ordered by keys.
*/
fetchExceptions.keySet(),
getFetchExceptionMessages(this));
getFetchExceptionMessages(this),
timeSinceLastFetchMillis);
}
private static List<String> getFetchExceptionMessages(final Status status) {

View File

@ -32,6 +32,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
protected ShardFollowNodeTask.Status createTestInstance() {
// if you change this constructor, reflect the changes in the hand-written assertions below
return new ShardFollowNodeTask.Status(
randomAlphaOfLength(4),
randomInt(),
randomNonNegativeLong(),
randomNonNegativeLong(),
@ -51,12 +52,14 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomReadExceptions());
randomReadExceptions(),
randomLong());
}
@Override
protected void assertEqualInstances(final ShardFollowNodeTask.Status expectedInstance, final ShardFollowNodeTask.Status newInstance) {
assertNotSame(expectedInstance, newInstance);
assertThat(newInstance.leaderIndex(), equalTo(expectedInstance.leaderIndex()));
assertThat(newInstance.getShardId(), equalTo(expectedInstance.getShardId()));
assertThat(newInstance.leaderGlobalCheckpoint(), equalTo(expectedInstance.leaderGlobalCheckpoint()));
assertThat(newInstance.leaderMaxSeqNo(), equalTo(expectedInstance.leaderMaxSeqNo()));
@ -87,6 +90,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
anyOf(instanceOf(ElasticsearchException.class), instanceOf(IllegalStateException.class)));
assertThat(entry.getValue().getCause().getMessage(), containsString(expected.getCause().getMessage()));
}
assertThat(newInstance.timeSinceLastFetchMillis(), equalTo(expectedInstance.timeSinceLastFetchMillis()));
}
@Override

View File

@ -19,11 +19,15 @@
index: bar
body:
leader_index: foo
- is_true: follow_index_created
- is_true: follow_index_shards_acked
- is_true: index_following_started
# we can not reliably wait for replication to occur so we test the endpoint without indexing any documents
- do:
ccr.stats:
index: bar
- match: { bar.0.leader_index: "foo" }
- match: { bar.0.shard_id: 0 }
- gte: { bar.0.leader_global_checkpoint: -1 }
- gte: { bar.0.leader_max_seq_no: -1 }
@ -44,6 +48,7 @@
- match: { bar.0.number_of_failed_bulk_operations: 0 }
- match: { bar.0.number_of_operations_indexed: 0 }
- length: { bar.0.fetch_exceptions: 0 }
- gte: { bar.0.time_since_last_fetch_millis: -1 }
- do:
ccr.unfollow_index: