[CCR] Add total fetch time leader stat (#34577)

Add total fetch time leader stat, that
keeps track how much time was spent on fetches
from the leader cluster perspective.
This commit is contained in:
Martijn van Groningen 2018-10-23 16:41:06 +02:00 committed by GitHub
parent ca6808e55d
commit e6d87cc09f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 80 additions and 21 deletions

View File

@ -39,6 +39,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -67,6 +68,8 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
private TimeValue pollTimeout = TransportResumeFollowAction.DEFAULT_POLL_TIMEOUT;
private ByteSizeValue maxBatchSize = TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE;
private long relativeStartNanos;
public Request(ShardId shardId, String expectedHistoryUUID) {
super(shardId.getIndexName());
this.shardId = shardId;
@ -142,6 +145,9 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
expectedHistoryUUID = in.readString();
pollTimeout = in.readTimeValue();
maxBatchSize = new ByteSizeValue(in);
// Starting the clock in order to know how much time is spent on fetching operations:
relativeStartNanos = System.nanoTime();
}
@Override
@ -220,6 +226,12 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
return operations;
}
private long tookInMillis;
public long getTookInMillis() {
return tookInMillis;
}
Response() {
}
@ -228,13 +240,15 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
final long globalCheckpoint,
final long maxSeqNo,
final long maxSeqNoOfUpdatesOrDeletes,
final Translog.Operation[] operations) {
final Translog.Operation[] operations,
final long tookInMillis) {
this.mappingVersion = mappingVersion;
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNo = maxSeqNo;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
this.operations = operations;
this.tookInMillis = tookInMillis;
}
@Override
@ -245,6 +259,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
maxSeqNo = in.readZLong();
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
tookInMillis = in.readVLong();
}
@Override
@ -255,6 +270,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
out.writeZLong(maxSeqNo);
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
out.writeArray(Translog.Operation::writeOperation, operations);
out.writeVLong(tookInMillis);
}
@Override
@ -266,12 +282,14 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
globalCheckpoint == that.globalCheckpoint &&
maxSeqNo == that.maxSeqNo &&
maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
Arrays.equals(operations, that.operations);
Arrays.equals(operations, that.operations) &&
tookInMillis == that.tookInMillis;
}
@Override
public int hashCode() {
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, Arrays.hashCode(operations));
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes,
Arrays.hashCode(operations), tookInMillis);
}
}
@ -308,7 +326,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
request.getMaxBatchSize());
// must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations);
return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations, request.relativeStartNanos);
}
@Override
@ -373,7 +391,8 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY));
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY,
request.relativeStartNanos));
} catch (final Exception caught) {
caught.addSuppressed(e);
listener.onFailure(caught);
@ -459,8 +478,11 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
}
static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats,
final long maxSeqNoOfUpdates, final Translog.Operation[] operations) {
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, operations);
final long maxSeqNoOfUpdates, final Translog.Operation[] operations, long relativeStartNanos) {
long tookInNanos = System.nanoTime() - relativeStartNanos;
long tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos);
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates,
operations, tookInMillis);
}
}

View File

@ -71,6 +71,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private int numConcurrentReads = 0;
private int numConcurrentWrites = 0;
private long currentMappingVersion = 0;
private long totalFetchTookTimeMillis = 0;
private long totalFetchTimeMillis = 0;
private long numberOfSuccessfulFetches = 0;
private long numberOfFailedFetches = 0;
@ -238,6 +239,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
fetchExceptions.remove(from);
if (response.getOperations().length > 0) {
// do not count polls against fetch stats
totalFetchTookTimeMillis += response.getTookInMillis();
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfSuccessfulFetches++;
operationsReceived += response.getOperations().length;
@ -449,6 +451,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
buffer.size(),
currentMappingVersion,
totalFetchTimeMillis,
totalFetchTookTimeMillis,
numberOfSuccessfulFetches,
numberOfFailedFetches,
operationsReceived,

View File

@ -26,7 +26,8 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
leaderGlobalCheckpoint,
leaderMaxSeqNo,
maxSeqNoOfUpdatesOrDeletes,
operations
operations,
randomNonNegativeLong()
);
}

View File

@ -158,7 +158,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
final long globalCheckpoint = tracker.getCheckpoint();
final long maxSeqNo = tracker.getMaxSeqNo();
handler.accept(new ShardChangesAction.Response(
0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0]));
0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0], 1L));
}
};
threadPool.generic().execute(task);
@ -233,7 +233,8 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
nextGlobalCheckPoint,
nextGlobalCheckPoint,
randomNonNegativeLong(),
ops.toArray(EMPTY))
ops.toArray(EMPTY),
randomNonNegativeLong())
)
);
responses.put(prevGlobalCheckpoint, item);
@ -256,7 +257,8 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
prevGlobalCheckpoint,
prevGlobalCheckpoint,
randomNonNegativeLong(),
EMPTY
EMPTY,
randomNonNegativeLong()
);
item.add(new TestResponse(null, mappingVersion, response));
}
@ -273,7 +275,8 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
localLeaderGCP,
localLeaderGCP,
randomNonNegativeLong(),
ops.toArray(EMPTY)
ops.toArray(EMPTY),
randomNonNegativeLong()
);
item.add(new TestResponse(null, mappingVersion, response));
responses.put(fromSeqNo, Collections.unmodifiableList(item));

View File

@ -56,6 +56,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomReadExceptions(),
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);

View File

@ -439,7 +439,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
shardChangesRequests.clear();
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0]));
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0], 1L));
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
@ -782,7 +782,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
leaderGlobalCheckpoints.poll(),
maxSeqNos.poll(),
randomNonNegativeLong(),
operations
operations,
1L
);
handler.accept(response);
}
@ -813,7 +814,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
leaderGlobalCheckPoint,
leaderGlobalCheckPoint,
randomNonNegativeLong(),
ops.toArray(new Translog.Operation[0])
ops.toArray(new Translog.Operation[0]),
1L
);
}

View File

@ -429,7 +429,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
if (from > seqNoStats.getGlobalCheckpoint()) {
handler.accept(ShardChangesAction.getResponse(1L, seqNoStats,
maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY));
maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY, 1L));
return;
}
Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from,
@ -440,7 +440,8 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
seqNoStats.getGlobalCheckpoint(),
seqNoStats.getMaxSeqNo(),
maxSeqNoOfUpdatesOrDeletes,
ops
ops,
1L
);
handler.accept(response);
return;

View File

@ -49,6 +49,7 @@ public class StatsResponsesTests extends AbstractStreamableTestCase<FollowStatsA
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
Collections.emptyNavigableMap(),
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);

View File

@ -94,6 +94,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
final int numberOfQueuedWrites = randomIntBetween(0, Integer.MAX_VALUE);
final long mappingVersion = randomIntBetween(0, Integer.MAX_VALUE);
final long totalFetchTimeMillis = randomLongBetween(0, 4096);
final long totalFetchTookTimeMillis = randomLongBetween(0, 4096);
final long numberOfSuccessfulFetches = randomNonNegativeLong();
final long numberOfFailedFetches = randomLongBetween(0, 8);
final long operationsReceived = randomNonNegativeLong();
@ -122,6 +123,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
numberOfQueuedWrites,
mappingVersion,
totalFetchTimeMillis,
totalFetchTookTimeMillis,
numberOfSuccessfulFetches,
numberOfFailedFetches,
operationsReceived,
@ -166,6 +168,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
+ "\"number_of_queued_writes\":" + numberOfQueuedWrites + ","
+ "\"mapping_version\":" + mappingVersion + ","
+ "\"total_fetch_time_millis\":" + totalFetchTimeMillis + ","
+ "\"total_fetch_leader_time_millis\":" + totalFetchTookTimeMillis + ","
+ "\"number_of_successful_fetches\":" + numberOfSuccessfulFetches + ","
+ "\"number_of_failed_fetches\":" + numberOfFailedFetches + ","
+ "\"operations_received\":" + operationsReceived + ","
@ -208,6 +211,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
1,
1,
100,
50,
10,
0,
10,
@ -226,7 +230,6 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
Map<String, Object> template =
XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false);
Map<?, ?> followStatsMapping = (Map<?, ?>) XContentMapValues.extractValue("mappings.doc.properties.ccr_stats.properties", template);
assertThat(serializedStatus.size(), equalTo(followStatsMapping.size()));
for (Map.Entry<String, Object> entry : serializedStatus.entrySet()) {
String fieldName = entry.getKey();

View File

@ -48,6 +48,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
private static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes");
private static final ParseField MAPPING_VERSION_FIELD = new ParseField("mapping_version");
private static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis");
private static final ParseField TOTAL_FETCH_LEADER_TIME_MILLIS_FIELD = new ParseField("total_fetch_leader_time_millis");
private static final ParseField NUMBER_OF_SUCCESSFUL_FETCHES_FIELD = new ParseField("number_of_successful_fetches");
private static final ParseField NUMBER_OF_FAILED_FETCHES_FIELD = new ParseField("number_of_failed_fetches");
private static final ParseField OPERATIONS_RECEIVED_FIELD = new ParseField("operations_received");
@ -87,12 +88,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
(long) args[19],
(long) args[20],
(long) args[21],
(long) args[22],
new TreeMap<>(
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[22])
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[23])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
(long) args[23],
(ElasticsearchException) args[24]));
(long) args[24],
(ElasticsearchException) args[25]));
public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry";
@ -116,6 +118,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAPPING_VERSION_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_LEADER_TIME_MILLIS_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD);
@ -228,6 +231,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
return totalFetchTimeMillis;
}
private final long totalFetchLeaderTimeMillis;
public long totalFetchLeaderTimeMillis() {
return totalFetchLeaderTimeMillis;
}
private final long numberOfSuccessfulFetches;
public long numberOfSuccessfulFetches() {
@ -309,6 +318,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
final int numberOfQueuedWrites,
final long mappingVersion,
final long totalFetchTimeMillis,
final long totalFetchLeaderTimeMillis,
final long numberOfSuccessfulFetches,
final long numberOfFailedFetches,
final long operationsReceived,
@ -334,6 +344,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
this.numberOfQueuedWrites = numberOfQueuedWrites;
this.mappingVersion = mappingVersion;
this.totalFetchTimeMillis = totalFetchTimeMillis;
this.totalFetchLeaderTimeMillis = totalFetchLeaderTimeMillis;
this.numberOfSuccessfulFetches = numberOfSuccessfulFetches;
this.numberOfFailedFetches = numberOfFailedFetches;
this.operationsReceived = operationsReceived;
@ -362,6 +373,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
this.numberOfQueuedWrites = in.readVInt();
this.mappingVersion = in.readVLong();
this.totalFetchTimeMillis = in.readVLong();
this.totalFetchLeaderTimeMillis = in.readVLong();
this.numberOfSuccessfulFetches = in.readVLong();
this.numberOfFailedFetches = in.readVLong();
this.operationsReceived = in.readVLong();
@ -397,6 +409,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
out.writeVInt(numberOfQueuedWrites);
out.writeVLong(mappingVersion);
out.writeVLong(totalFetchTimeMillis);
out.writeVLong(totalFetchLeaderTimeMillis);
out.writeVLong(numberOfSuccessfulFetches);
out.writeVLong(numberOfFailedFetches);
out.writeVLong(operationsReceived);
@ -444,6 +457,10 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(),
"total_fetch_time",
new TimeValue(totalFetchTimeMillis, TimeUnit.MILLISECONDS));
builder.humanReadableField(
TOTAL_FETCH_LEADER_TIME_MILLIS_FIELD.getPreferredName(),
"total_fetch_leader_time",
new TimeValue(totalFetchLeaderTimeMillis, TimeUnit.MILLISECONDS));
builder.field(NUMBER_OF_SUCCESSFUL_FETCHES_FIELD.getPreferredName(), numberOfSuccessfulFetches);
builder.field(NUMBER_OF_FAILED_FETCHES_FIELD.getPreferredName(), numberOfFailedFetches);
builder.field(OPERATIONS_RECEIVED_FIELD.getPreferredName(), operationsReceived);
@ -516,6 +533,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
numberOfQueuedWrites == that.numberOfQueuedWrites &&
mappingVersion == that.mappingVersion &&
totalFetchTimeMillis == that.totalFetchTimeMillis &&
totalFetchLeaderTimeMillis == that.totalFetchLeaderTimeMillis &&
numberOfSuccessfulFetches == that.numberOfSuccessfulFetches &&
numberOfFailedFetches == that.numberOfFailedFetches &&
operationsReceived == that.operationsReceived &&
@ -552,6 +570,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
numberOfQueuedWrites,
mappingVersion,
totalFetchTimeMillis,
totalFetchLeaderTimeMillis,
numberOfSuccessfulFetches,
numberOfFailedFetches,
operationsReceived,

View File

@ -971,6 +971,9 @@
"total_fetch_time_millis": {
"type": "long"
},
"total_fetch_leader_time_millis": {
"type": "long"
},
"number_of_successful_fetches": {
"type": "long"
},