Introduce CCR stats endpoint (#32350)

This commit introduces the CCR stats endpoint which provides shard-level
stats on the status of CCR follower tasks.
This commit is contained in:
Jason Tedor 2018-08-03 09:09:45 -04:00 committed by GitHub
parent 6eeb628d6d
commit d640c9ddf9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1075 additions and 263 deletions

View File

@ -3,6 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.action.ActionRequest;
@ -34,18 +35,21 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import org.elasticsearch.xpack.ccr.rest.RestFollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestCreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestFollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
import org.elasticsearch.xpack.core.XPackPlugin;
@ -91,12 +95,12 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
}
return Arrays.asList(
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
new ActionHandler<>(FollowIndexAction.INSTANCE, FollowIndexAction.TransportAction.class),
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class),
new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class),
new ActionHandler<>(CreateAndFollowIndexAction.INSTANCE, CreateAndFollowIndexAction.TransportAction.class)
);
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
new ActionHandler<>(CreateAndFollowIndexAction.INSTANCE, CreateAndFollowIndexAction.TransportAction.class),
new ActionHandler<>(FollowIndexAction.INSTANCE, FollowIndexAction.TransportAction.class),
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class));
}
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
@ -104,10 +108,10 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
return Arrays.asList(
new RestUnfollowIndexAction(settings, restController),
new RestCcrStatsAction(settings, restController),
new RestCreateAndFollowIndexAction(settings, restController),
new RestFollowIndexAction(settings, restController),
new RestCreateAndFollowIndexAction(settings, restController)
);
new RestUnfollowIndexAction(settings, restController));
}
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {

View File

@ -0,0 +1,179 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class CcrStatsAction extends Action<CcrStatsAction.TasksResponse> {
public static final String NAME = "cluster:monitor/ccr/stats";
public static final CcrStatsAction INSTANCE = new CcrStatsAction();
private CcrStatsAction() {
super(NAME);
}
@Override
public TasksResponse newResponse() {
return null;
}
public static class TasksResponse extends BaseTasksResponse implements ToXContentObject {
private final List<TaskResponse> taskResponses;
public TasksResponse() {
this(Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
}
TasksResponse(
final List<TaskOperationFailure> taskFailures,
final List<? extends FailedNodeException> nodeFailures,
final List<TaskResponse> taskResponses) {
super(taskFailures, nodeFailures);
this.taskResponses = taskResponses;
}
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
// sort by index name, then shard ID
final Map<String, Map<Integer, TaskResponse>> taskResponsesByIndex = new TreeMap<>();
for (final TaskResponse taskResponse : taskResponses) {
taskResponsesByIndex.computeIfAbsent(
taskResponse.followerShardId().getIndexName(),
k -> new TreeMap<>()).put(taskResponse.followerShardId().getId(), taskResponse);
}
builder.startObject();
{
for (final Map.Entry<String, Map<Integer, TaskResponse>> index : taskResponsesByIndex.entrySet()) {
builder.startArray(index.getKey());
{
for (final Map.Entry<Integer, TaskResponse> shard : index.getValue().entrySet()) {
shard.getValue().status().toXContent(builder, params);
}
}
builder.endArray();
}
}
builder.endObject();
return builder;
}
}
public static class TasksRequest extends BaseTasksRequest<TasksRequest> implements IndicesRequest {
private String[] indices;
@Override
public String[] indices() {
return indices;
}
public void setIndices(final String[] indices) {
this.indices = indices;
}
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed();
@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}
public void setIndicesOptions(final IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
}
@Override
public boolean match(final Task task) {
/*
* This is a limitation of the current tasks API. When the transport action is executed, the tasks API invokes this match method
* to find the tasks on which to execute the task-level operation (see TransportTasksAction#nodeOperation and
* TransportTasksAction#processTasks). If we do the matching here, then we can not match index patterns. Therefore, we override
* TransportTasksAction#processTasks (see TransportCcrStatsAction#processTasks) and do the matching there. We should never see
* this method invoked and since we can not support matching a task on the basis of the request here, we throw that this
* operation is unsupported.
*/
throw new UnsupportedOperationException();
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
}
}
public static class TaskResponse implements Writeable {
private final ShardId followerShardId;
ShardId followerShardId() {
return followerShardId;
}
private final ShardFollowNodeTask.Status status;
ShardFollowNodeTask.Status status() {
return status;
}
TaskResponse(final ShardId followerShardId, final ShardFollowNodeTask.Status status) {
this.followerShardId = followerShardId;
this.status = status;
}
TaskResponse(final StreamInput in) throws IOException {
this.followerShardId = ShardId.readShardId(in);
this.status = new ShardFollowNodeTask.Status(in);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
followerShardId.writeTo(out);
status.writeTo(out);
}
}
}

View File

@ -3,6 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.Action;

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
@ -150,35 +151,45 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
public static final class Response extends ActionResponse {
private long indexMetadataVersion;
private long globalCheckpoint;
private Translog.Operation[] operations;
Response() {
}
Response(long indexMetadataVersion, long globalCheckpoint, final Translog.Operation[] operations) {
this.indexMetadataVersion = indexMetadataVersion;
this.globalCheckpoint = globalCheckpoint;
this.operations = operations;
}
public long getIndexMetadataVersion() {
return indexMetadataVersion;
}
private long globalCheckpoint;
public long getGlobalCheckpoint() {
return globalCheckpoint;
}
private long maxSeqNo;
public long getMaxSeqNo() {
return maxSeqNo;
}
private Translog.Operation[] operations;
public Translog.Operation[] getOperations() {
return operations;
}
Response() {
}
Response(final long indexMetadataVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) {
this.indexMetadataVersion = indexMetadataVersion;
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNo = maxSeqNo;
this.operations = operations;
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
indexMetadataVersion = in.readVLong();
globalCheckpoint = in.readZLong();
maxSeqNo = in.readZLong();
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}
@ -187,6 +198,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
super.writeTo(out);
out.writeVLong(indexMetadataVersion);
out.writeZLong(globalCheckpoint);
out.writeZLong(maxSeqNo);
out.writeArray(Translog.Operation::writeOperation, operations);
}
@ -194,19 +206,16 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final Response response = (Response) o;
return indexMetadataVersion == response.indexMetadataVersion &&
globalCheckpoint == response.globalCheckpoint &&
Arrays.equals(operations, response.operations);
final Response that = (Response) o;
return indexMetadataVersion == that.indexMetadataVersion &&
globalCheckpoint == that.globalCheckpoint &&
maxSeqNo == that.maxSeqNo &&
Arrays.equals(operations, that.operations);
}
@Override
public int hashCode() {
int result = 1;
result += Objects.hashCode(indexMetadataVersion);
result += Objects.hashCode(globalCheckpoint);
result += Arrays.hashCode(operations);
return result;
return Objects.hash(indexMetadataVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations));
}
}
@ -231,12 +240,16 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
protected Response shardOperation(Request request, ShardId shardId) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
IndexShard indexShard = indexService.getShard(request.getShard().id());
long globalCheckpoint = indexShard.getGlobalCheckpoint();
final SeqNoStats seqNoStats = indexShard.seqNoStats();
final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();
final Translog.Operation[] operations = getOperations(indexShard, globalCheckpoint, request.fromSeqNo,
request.maxOperationCount, request.maxOperationSizeInBytes);
return new Response(indexMetaDataVersion, globalCheckpoint, operations);
final Translog.Operation[] operations = getOperations(
indexShard,
seqNoStats.getGlobalCheckpoint(),
request.fromSeqNo,
request.maxOperationCount,
request.maxOperationSizeInBytes);
return new Response(indexMetaDataVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
}
@Override
@ -292,4 +305,5 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
}
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
}
}

View File

@ -3,6 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.apache.logging.log4j.Logger;
@ -15,14 +16,18 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import java.io.IOException;
import java.util.ArrayList;
@ -33,10 +38,12 @@ import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
/**
* The node task that fetch the write operations from a leader shard and
@ -59,34 +66,60 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private final TimeValue retryTimeout;
private final TimeValue idleShardChangesRequestDelay;
private final BiConsumer<TimeValue, Runnable> scheduler;
private final LongSupplier relativeTimeProvider;
private volatile long lastRequestedSeqno;
private volatile long leaderGlobalCheckpoint;
private volatile int numConcurrentReads = 0;
private volatile int numConcurrentWrites = 0;
private volatile long followerGlobalCheckpoint = 0;
private volatile long currentIndexMetadataVersion = 0;
private long leaderGlobalCheckpoint;
private long leaderMaxSeqNo;
private long lastRequestedSeqNo;
private long followerGlobalCheckpoint = 0;
private long followerMaxSeqNo = 0;
private int numConcurrentReads = 0;
private int numConcurrentWrites = 0;
private long currentIndexMetadataVersion = 0;
private long totalFetchTimeMillis = 0;
private long numberOfSuccessfulFetches = 0;
private long numberOfFailedFetches = 0;
private long operationsReceived = 0;
private long totalTransferredBytes = 0;
private long totalIndexTimeMillis = 0;
private long numberOfSuccessfulBulkOperations = 0;
private long numberOfFailedBulkOperations = 0;
private long numberOfOperationsIndexed = 0;
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler) {
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
super(id, type, action, description, parentTask, headers);
this.params = params;
this.scheduler = scheduler;
this.relativeTimeProvider = relativeTimeProvider;
this.retryTimeout = params.getRetryTimeout();
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
}
void start(long leaderGlobalCheckpoint, long followerGlobalCheckpoint) {
this.lastRequestedSeqno = followerGlobalCheckpoint;
this.followerGlobalCheckpoint = followerGlobalCheckpoint;
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
void start(
final long leaderGlobalCheckpoint,
final long leaderMaxSeqNo,
final long followerGlobalCheckpoint,
final long followerMaxSeqNo) {
/*
* While this should only ever be called once and before any other threads can touch these fields, we use synchronization here to
* avoid the need to declare these fields as volatile. That is, we are ensuring thesefields are always accessed under the same lock.
*/
synchronized (this) {
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
this.leaderMaxSeqNo = leaderMaxSeqNo;
this.followerGlobalCheckpoint = followerGlobalCheckpoint;
this.followerMaxSeqNo = followerMaxSeqNo;
this.lastRequestedSeqNo = followerGlobalCheckpoint;
}
// Forcefully updates follower mapping, this gets us the leader imd version and
// makes sure that leader and follower mapping are identical.
updateMapping(imdVersion -> {
currentIndexMetadataVersion = imdVersion;
synchronized (ShardFollowNodeTask.this) {
currentIndexMetadataVersion = imdVersion;
}
LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, indexMetaDataVersion={}",
params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, imdVersion);
coordinateReads();
@ -99,11 +132,11 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
return;
}
LOGGER.trace("{} coordinate reads, lastRequestedSeqno={}, leaderGlobalCheckpoint={}",
params.getFollowShardId(), lastRequestedSeqno, leaderGlobalCheckpoint);
LOGGER.trace("{} coordinate reads, lastRequestedSeqNo={}, leaderGlobalCheckpoint={}",
params.getFollowShardId(), lastRequestedSeqNo, leaderGlobalCheckpoint);
final int maxBatchOperationCount = params.getMaxBatchOperationCount();
while (hasReadBudget() && lastRequestedSeqno < leaderGlobalCheckpoint) {
final long from = lastRequestedSeqno + 1;
while (hasReadBudget() && lastRequestedSeqNo < leaderGlobalCheckpoint) {
final long from = lastRequestedSeqNo + 1;
final long maxRequiredSeqNo = Math.min(leaderGlobalCheckpoint, from + maxBatchOperationCount - 1);
final int requestBatchCount;
if (numConcurrentReads == 0) {
@ -117,17 +150,17 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
params.getFollowShardId(), numConcurrentReads, from, maxRequiredSeqNo, requestBatchCount);
numConcurrentReads++;
sendShardChangesRequest(from, requestBatchCount, maxRequiredSeqNo);
lastRequestedSeqno = maxRequiredSeqNo;
lastRequestedSeqNo = maxRequiredSeqNo;
}
if (numConcurrentReads == 0 && hasReadBudget()) {
assert lastRequestedSeqno == leaderGlobalCheckpoint;
assert lastRequestedSeqNo == leaderGlobalCheckpoint;
// We sneak peek if there is any thing new in the leader.
// If there is we will happily accept
numConcurrentReads++;
long from = lastRequestedSeqno + 1;
long from = lastRequestedSeqNo + 1;
LOGGER.trace("{}[{}] peek read [{}]", params.getFollowShardId(), numConcurrentReads, from);
sendShardChangesRequest(from, maxBatchOperationCount, lastRequestedSeqno);
sendShardChangesRequest(from, maxBatchOperationCount, lastRequestedSeqNo);
}
}
@ -185,9 +218,24 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
}
private void sendShardChangesRequest(long from, int maxOperationCount, long maxRequiredSeqNo, AtomicInteger retryCounter) {
final long startTime = relativeTimeProvider.getAsLong();
innerSendShardChangesRequest(from, maxOperationCount,
response -> handleReadResponse(from, maxRequiredSeqNo, response),
e -> handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter)));
response -> {
synchronized (ShardFollowNodeTask.this) {
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfSuccessfulFetches++;
operationsReceived += response.getOperations().length;
totalTransferredBytes += Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
}
handleReadResponse(from, maxRequiredSeqNo, response);
},
e -> {
synchronized (ShardFollowNodeTask.this) {
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfFailedFetches++;
}
handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter));
});
}
void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
@ -202,6 +250,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
onOperationsFetched(response.getOperations());
leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint());
leaderMaxSeqNo = Math.max(leaderMaxSeqNo, response.getMaxSeqNo());
final long newFromSeqNo;
if (response.getOperations().length == 0) {
newFromSeqNo = from;
@ -214,8 +263,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong();
newFromSeqNo = maxSeqNo + 1;
// update last requested seq no as we may have gotten more than we asked for and we don't want to ask it again.
lastRequestedSeqno = Math.max(lastRequestedSeqno, maxSeqNo);
assert lastRequestedSeqno <= leaderGlobalCheckpoint : "lastRequestedSeqno [" + lastRequestedSeqno +
lastRequestedSeqNo = Math.max(lastRequestedSeqNo, maxSeqNo);
assert lastRequestedSeqNo <= leaderGlobalCheckpoint : "lastRequestedSeqNo [" + lastRequestedSeqNo +
"] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]";
coordinateWrites();
}
@ -227,7 +276,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
} else {
// read is completed, decrement
numConcurrentReads--;
if (response.getOperations().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqno) {
if (response.getOperations().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqNo) {
// we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay
// future requests
LOGGER.trace("{} received no ops and no known ops to fetch, scheduling to coordinate reads",
@ -244,14 +293,29 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
}
private void sendBulkShardOperationsRequest(List<Translog.Operation> operations, AtomicInteger retryCounter) {
final long startTime = relativeTimeProvider.getAsLong();
innerSendBulkShardOperationsRequest(operations,
this::handleWriteResponse,
e -> handleFailure(e, retryCounter, () -> sendBulkShardOperationsRequest(operations, retryCounter))
response -> {
synchronized (ShardFollowNodeTask.this) {
totalIndexTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfSuccessfulBulkOperations++;
numberOfOperationsIndexed += operations.size();
}
handleWriteResponse(response);
},
e -> {
synchronized (ShardFollowNodeTask.this) {
totalIndexTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfFailedBulkOperations++;
}
handleFailure(e, retryCounter, () -> sendBulkShardOperationsRequest(operations, retryCounter));
}
);
}
private synchronized void handleWriteResponse(long followerLocalCheckpoint) {
this.followerGlobalCheckpoint = Math.max(this.followerGlobalCheckpoint, followerLocalCheckpoint);
private synchronized void handleWriteResponse(final BulkShardOperationsResponse response) {
this.followerGlobalCheckpoint = Math.max(this.followerGlobalCheckpoint, response.getGlobalCheckpoint());
this.followerMaxSeqNo = Math.max(this.followerMaxSeqNo, response.getMaxSeqNo());
numConcurrentWrites--;
assert numConcurrentWrites >= 0;
coordinateWrites();
@ -308,8 +372,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
// These methods are protected for testing purposes:
protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);
protected abstract void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, LongConsumer handler,
Consumer<Exception> errorHandler);
protected abstract void innerSendBulkShardOperationsRequest(
List<Translog.Operation> operations, Consumer<BulkShardOperationsResponse> handler, Consumer<Exception> errorHandler);
protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
Consumer<Exception> errorHandler);
@ -323,141 +387,399 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
return isCancelled() || isCompleted();
}
public ShardId getFollowShardId() {
return params.getFollowShardId();
}
@Override
public Status getStatus() {
return new Status(leaderGlobalCheckpoint, lastRequestedSeqno, followerGlobalCheckpoint, numConcurrentReads, numConcurrentWrites,
currentIndexMetadataVersion);
public synchronized Status getStatus() {
return new Status(
getFollowShardId().getId(),
leaderGlobalCheckpoint,
leaderMaxSeqNo,
followerGlobalCheckpoint,
followerMaxSeqNo,
lastRequestedSeqNo,
numConcurrentReads,
numConcurrentWrites,
buffer.size(),
currentIndexMetadataVersion,
totalFetchTimeMillis,
numberOfSuccessfulFetches,
numberOfFailedFetches,
operationsReceived,
totalTransferredBytes,
totalIndexTimeMillis,
numberOfSuccessfulBulkOperations,
numberOfFailedBulkOperations,
numberOfOperationsIndexed);
}
public static class Status implements Task.Status {
public static final String NAME = "shard-follow-node-task-status";
static final ParseField SHARD_ID = new ParseField("shard_id");
static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint");
static final ParseField LEADER_MAX_SEQ_NO_FIELD = new ParseField("leader_max_seq_no");
static final ParseField FOLLOWER_GLOBAL_CHECKPOINT_FIELD = new ParseField("follower_global_checkpoint");
static final ParseField LAST_REQUESTED_SEQNO_FIELD = new ParseField("last_requested_seqno");
static final ParseField FOLLOWER_MAX_SEQ_NO_FIELD = new ParseField("follower_max_seq_no");
static final ParseField LAST_REQUESTED_SEQ_NO_FIELD = new ParseField("last_requested_seq_no");
static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads");
static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField("number_of_concurrent_writes");
static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes");
static final ParseField INDEX_METADATA_VERSION_FIELD = new ParseField("index_metadata_version");
static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis");
static final ParseField NUMBER_OF_SUCCESSFUL_FETCHES_FIELD = new ParseField("number_of_successful_fetches");
static final ParseField NUMBER_OF_FAILED_FETCHES_FIELD = new ParseField("number_of_failed_fetches");
static final ParseField OPERATIONS_RECEIVED_FIELD = new ParseField("operations_received");
static final ParseField TOTAL_TRANSFERRED_BYTES = new ParseField("total_transferred_bytes");
static final ParseField TOTAL_INDEX_TIME_MILLIS_FIELD = new ParseField("total_index_time_millis");
static final ParseField NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD = new ParseField("number_of_successful_bulk_operations");
static final ParseField NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD = new ParseField("number_of_failed_bulk_operations");
static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField("number_of_operations_indexed");
static final ConstructingObjectParser<Status, Void> PARSER = new ConstructingObjectParser<>(NAME,
args -> new Status((long) args[0], (long) args[1], (long) args[2], (int) args[3], (int) args[4], (long) args[5]));
args -> new Status(
(int) args[0],
(long) args[1],
(long) args[2],
(long) args[3],
(long) args[4],
(long) args[5],
(int) args[6],
(int) args[7],
(int) args[8],
(long) args[9],
(long) args[10],
(long) args[11],
(long) args[12],
(long) args[13],
(long) args[14],
(long) args[15],
(long) args[16],
(long) args[17],
(long) args[18]));
static {
PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQNO_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAX_SEQ_NO_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQ_NO_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_TRANSFERRED_BYTES);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_INDEX_TIME_MILLIS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD);
}
private final int shardId;
public int getShardId() {
return shardId;
}
private final long leaderGlobalCheckpoint;
private final long lastRequestedSeqno;
private final long followerGlobalCheckpoint;
private final int numberOfConcurrentReads;
private final int numberOfConcurrentWrites;
private final long indexMetadataVersion;
Status(long leaderGlobalCheckpoint, long lastRequestedSeqno, long followerGlobalCheckpoint,
int numberOfConcurrentReads, int numberOfConcurrentWrites, long indexMetadataVersion) {
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
this.lastRequestedSeqno = lastRequestedSeqno;
this.followerGlobalCheckpoint = followerGlobalCheckpoint;
this.numberOfConcurrentReads = numberOfConcurrentReads;
this.numberOfConcurrentWrites = numberOfConcurrentWrites;
this.indexMetadataVersion = indexMetadataVersion;
}
public Status(StreamInput in) throws IOException {
this.leaderGlobalCheckpoint = in.readZLong();
this.lastRequestedSeqno = in.readZLong();
this.followerGlobalCheckpoint = in.readZLong();
this.numberOfConcurrentReads = in.readVInt();
this.numberOfConcurrentWrites = in.readVInt();
this.indexMetadataVersion = in.readVLong();
}
public long getLeaderGlobalCheckpoint() {
public long leaderGlobalCheckpoint() {
return leaderGlobalCheckpoint;
}
public long getLastRequestedSeqno() {
return lastRequestedSeqno;
private final long leaderMaxSeqNo;
public long leaderMaxSeqNo() {
return leaderMaxSeqNo;
}
public long getFollowerGlobalCheckpoint() {
private final long followerGlobalCheckpoint;
public long followerGlobalCheckpoint() {
return followerGlobalCheckpoint;
}
public int getNumberOfConcurrentReads() {
private final long followerMaxSeqNo;
public long followerMaxSeqNo() {
return followerMaxSeqNo;
}
private final long lastRequestedSeqNo;
public long lastRequestedSeqNo() {
return lastRequestedSeqNo;
}
private final int numberOfConcurrentReads;
public int numberOfConcurrentReads() {
return numberOfConcurrentReads;
}
public int getNumberOfConcurrentWrites() {
private final int numberOfConcurrentWrites;
public int numberOfConcurrentWrites() {
return numberOfConcurrentWrites;
}
public long getIndexMetadataVersion() {
private final int numberOfQueuedWrites;
public int numberOfQueuedWrites() {
return numberOfQueuedWrites;
}
private final long indexMetadataVersion;
public long indexMetadataVersion() {
return indexMetadataVersion;
}
private final long totalFetchTimeMillis;
public long totalFetchTimeMillis() {
return totalFetchTimeMillis;
}
private final long numberOfSuccessfulFetches;
public long numberOfSuccessfulFetches() {
return numberOfSuccessfulFetches;
}
private final long numberOfFailedFetches;
public long numberOfFailedFetches() {
return numberOfFailedFetches;
}
private final long operationsReceived;
public long operationsReceived() {
return operationsReceived;
}
private final long totalTransferredBytes;
public long totalTransferredBytes() {
return totalTransferredBytes;
}
private final long totalIndexTimeMillis;
public long totalIndexTimeMillis() {
return totalIndexTimeMillis;
}
private final long numberOfSuccessfulBulkOperations;
public long numberOfSuccessfulBulkOperations() {
return numberOfSuccessfulBulkOperations;
}
private final long numberOfFailedBulkOperations;
public long numberOfFailedBulkOperations() {
return numberOfFailedBulkOperations;
}
private final long numberOfOperationsIndexed;
public long numberOfOperationsIndexed() {
return numberOfOperationsIndexed;
}
Status(
final int shardId,
final long leaderGlobalCheckpoint,
final long leaderMaxSeqNo,
final long followerGlobalCheckpoint,
final long followerMaxSeqNo,
final long lastRequestedSeqNo,
final int numberOfConcurrentReads,
final int numberOfConcurrentWrites,
final int numberOfQueuedWrites,
final long indexMetadataVersion,
final long totalFetchTimeMillis,
final long numberOfSuccessfulFetches,
final long numberOfFailedFetches,
final long operationsReceived,
final long totalTransferredBytes,
final long totalIndexTimeMillis,
final long numberOfSuccessfulBulkOperations,
final long numberOfFailedBulkOperations,
final long numberOfOperationsIndexed) {
this.shardId = shardId;
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
this.leaderMaxSeqNo = leaderMaxSeqNo;
this.followerGlobalCheckpoint = followerGlobalCheckpoint;
this.followerMaxSeqNo = followerMaxSeqNo;
this.lastRequestedSeqNo = lastRequestedSeqNo;
this.numberOfConcurrentReads = numberOfConcurrentReads;
this.numberOfConcurrentWrites = numberOfConcurrentWrites;
this.numberOfQueuedWrites = numberOfQueuedWrites;
this.indexMetadataVersion = indexMetadataVersion;
this.totalFetchTimeMillis = totalFetchTimeMillis;
this.numberOfSuccessfulFetches = numberOfSuccessfulFetches;
this.numberOfFailedFetches = numberOfFailedFetches;
this.operationsReceived = operationsReceived;
this.totalTransferredBytes = totalTransferredBytes;
this.totalIndexTimeMillis = totalIndexTimeMillis;
this.numberOfSuccessfulBulkOperations = numberOfSuccessfulBulkOperations;
this.numberOfFailedBulkOperations = numberOfFailedBulkOperations;
this.numberOfOperationsIndexed = numberOfOperationsIndexed;
}
public Status(final StreamInput in) throws IOException {
this.shardId = in.readVInt();
this.leaderGlobalCheckpoint = in.readZLong();
this.leaderMaxSeqNo = in.readZLong();
this.followerGlobalCheckpoint = in.readZLong();
this.followerMaxSeqNo = in.readZLong();
this.lastRequestedSeqNo = in.readZLong();
this.numberOfConcurrentReads = in.readVInt();
this.numberOfConcurrentWrites = in.readVInt();
this.numberOfQueuedWrites = in.readVInt();
this.indexMetadataVersion = in.readVLong();
this.totalFetchTimeMillis = in.readVLong();
this.numberOfSuccessfulFetches = in.readVLong();
this.numberOfFailedFetches = in.readVLong();
this.operationsReceived = in.readVLong();
this.totalTransferredBytes = in.readVLong();
this.totalIndexTimeMillis = in.readVLong();
this.numberOfSuccessfulBulkOperations = in.readVLong();
this.numberOfFailedBulkOperations = in.readVLong();
this.numberOfOperationsIndexed = in.readVLong();
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
public void writeTo(final StreamOutput out) throws IOException {
out.writeVInt(shardId);
out.writeZLong(leaderGlobalCheckpoint);
out.writeZLong(lastRequestedSeqno);
out.writeZLong(leaderMaxSeqNo);
out.writeZLong(followerGlobalCheckpoint);
out.writeZLong(followerMaxSeqNo);
out.writeZLong(lastRequestedSeqNo);
out.writeVInt(numberOfConcurrentReads);
out.writeVInt(numberOfConcurrentWrites);
out.writeVInt(numberOfQueuedWrites);
out.writeVLong(indexMetadataVersion);
out.writeVLong(totalFetchTimeMillis);
out.writeVLong(numberOfSuccessfulFetches);
out.writeVLong(numberOfFailedFetches);
out.writeVLong(operationsReceived);
out.writeVLong(totalTransferredBytes);
out.writeVLong(totalIndexTimeMillis);
out.writeVLong(numberOfSuccessfulBulkOperations);
out.writeVLong(numberOfFailedBulkOperations);
out.writeVLong(numberOfOperationsIndexed);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
{
builder.field(SHARD_ID.getPreferredName(), shardId);
builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint);
builder.field(LEADER_MAX_SEQ_NO_FIELD.getPreferredName(), leaderMaxSeqNo);
builder.field(FOLLOWER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), followerGlobalCheckpoint);
builder.field(LAST_REQUESTED_SEQNO_FIELD.getPreferredName(), lastRequestedSeqno);
builder.field(FOLLOWER_MAX_SEQ_NO_FIELD.getPreferredName(), followerMaxSeqNo);
builder.field(LAST_REQUESTED_SEQ_NO_FIELD.getPreferredName(), lastRequestedSeqNo);
builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads);
builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites);
builder.field(NUMBER_OF_QUEUED_WRITES_FIELD.getPreferredName(), numberOfQueuedWrites);
builder.field(INDEX_METADATA_VERSION_FIELD.getPreferredName(), indexMetadataVersion);
builder.humanReadableField(
TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(),
"total_fetch_time",
new TimeValue(totalFetchTimeMillis, TimeUnit.MILLISECONDS));
builder.field(NUMBER_OF_SUCCESSFUL_FETCHES_FIELD.getPreferredName(), numberOfSuccessfulFetches);
builder.field(NUMBER_OF_FAILED_FETCHES_FIELD.getPreferredName(), numberOfFailedFetches);
builder.field(OPERATIONS_RECEIVED_FIELD.getPreferredName(), operationsReceived);
builder.humanReadableField(
TOTAL_TRANSFERRED_BYTES.getPreferredName(),
"total_transferred",
new ByteSizeValue(totalTransferredBytes, ByteSizeUnit.BYTES));
builder.humanReadableField(
TOTAL_INDEX_TIME_MILLIS_FIELD.getPreferredName(),
"total_index_time",
new TimeValue(totalIndexTimeMillis, TimeUnit.MILLISECONDS));
builder.field(NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfSuccessfulBulkOperations);
builder.field(NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfFailedBulkOperations);
builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed);
}
builder.endObject();
return builder;
}
public static Status fromXContent(XContentParser parser) {
public static Status fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}
@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
return leaderGlobalCheckpoint == status.leaderGlobalCheckpoint &&
lastRequestedSeqno == status.lastRequestedSeqno &&
followerGlobalCheckpoint == status.followerGlobalCheckpoint &&
numberOfConcurrentReads == status.numberOfConcurrentReads &&
numberOfConcurrentWrites == status.numberOfConcurrentWrites &&
indexMetadataVersion == status.indexMetadataVersion;
final Status that = (Status) o;
return shardId == that.shardId &&
leaderGlobalCheckpoint == that.leaderGlobalCheckpoint &&
leaderMaxSeqNo == that.leaderMaxSeqNo &&
followerGlobalCheckpoint == that.followerGlobalCheckpoint &&
followerMaxSeqNo == that.followerMaxSeqNo &&
lastRequestedSeqNo == that.lastRequestedSeqNo &&
numberOfConcurrentReads == that.numberOfConcurrentReads &&
numberOfConcurrentWrites == that.numberOfConcurrentWrites &&
numberOfQueuedWrites == that.numberOfQueuedWrites &&
indexMetadataVersion == that.indexMetadataVersion &&
totalFetchTimeMillis == that.totalFetchTimeMillis &&
numberOfSuccessfulFetches == that.numberOfSuccessfulFetches &&
numberOfFailedFetches == that.numberOfFailedFetches &&
operationsReceived == that.operationsReceived &&
totalTransferredBytes == that.totalTransferredBytes &&
numberOfSuccessfulBulkOperations == that.numberOfSuccessfulBulkOperations &&
numberOfFailedBulkOperations == that.numberOfFailedBulkOperations;
}
@Override
public int hashCode() {
return Objects.hash(leaderGlobalCheckpoint, lastRequestedSeqno, followerGlobalCheckpoint, numberOfConcurrentReads,
numberOfConcurrentWrites, indexMetadataVersion);
return Objects.hash(
shardId,
leaderGlobalCheckpoint,
leaderMaxSeqNo,
followerGlobalCheckpoint,
followerMaxSeqNo,
lastRequestedSeqNo,
numberOfConcurrentReads,
numberOfConcurrentWrites,
numberOfQueuedWrites,
indexMetadataVersion,
totalFetchTimeMillis,
numberOfSuccessfulFetches,
numberOfFailedFetches,
operationsReceived,
totalTransferredBytes,
numberOfSuccessfulBulkOperations,
numberOfFailedBulkOperations);
}
public String toString() {
return Strings.toString(this);
}
}
}

View File

@ -3,6 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.Version;
@ -160,6 +161,10 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
return idleShardRetryDelay;
}
public String getTaskId() {
return followShardId.getIndex().getUUID() + "-" + followShardId.getId();
}
public Map<String, String> getHeaders() {
return headers;
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.AllocatedPersistentTask;
@ -37,6 +38,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import java.util.Arrays;
import java.util.List;
@ -90,7 +92,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
Client followerClient = wrapClient(client, params);
BiConsumer<TimeValue, Runnable> scheduler =
(delay, command) -> threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command);
return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, scheduler) {
return new ShardFollowNodeTask(
id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, scheduler, System::nanoTime) {
@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
@ -118,11 +121,13 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
}
@Override
protected void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, LongConsumer handler,
Consumer<Exception> errorHandler) {
protected void innerSendBulkShardOperationsRequest(
final List<Translog.Operation> operations,
final Consumer<BulkShardOperationsResponse> handler,
final Consumer<Exception> errorHandler) {
final BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations);
followerClient.execute(BulkShardOperationsAction.INSTANCE, request,
ActionListener.wrap(response -> handler.accept(response.getGlobalCheckpoint()), errorHandler));
ActionListener.wrap(response -> handler.accept(response), errorHandler));
}
@Override
@ -137,26 +142,35 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
};
}
interface BiLongConsumer {
void accept(long x, long y);
}
@Override
protected void nodeOperation(final AllocatedPersistentTask task, final ShardFollowTask params, final PersistentTaskState state) {
Client followerClient = wrapClient(client, params);
ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
logger.info("{} Started to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId());
fetchGlobalCheckpoint(followerClient, params.getFollowShardId(),
followerGCP -> shardFollowNodeTask.start(followerGCP, followerGCP), task::markAsFailed);
(followerGCP, maxSeqNo) -> shardFollowNodeTask.start(followerGCP, maxSeqNo, followerGCP, maxSeqNo), task::markAsFailed);
}
private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler) {
private void fetchGlobalCheckpoint(
final Client client,
final ShardId shardId,
final BiLongConsumer handler,
final Consumer<Exception> errorHandler) {
client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
IndexStats indexStats = r.getIndex(shardId.getIndexName());
Optional<ShardStats> filteredShardStats = Arrays.stream(indexStats.getShards())
.filter(shardStats -> shardStats.getShardRouting().shardId().equals(shardId))
.filter(shardStats -> shardStats.getShardRouting().primary())
.findAny();
if (filteredShardStats.isPresent()) {
final long globalCheckPoint = filteredShardStats.get().getSeqNoStats().getGlobalCheckpoint();
handler.accept(globalCheckPoint);
final SeqNoStats seqNoStats = filteredShardStats.get().getSeqNoStats();
final long globalCheckpoint = seqNoStats.getGlobalCheckpoint();
final long maxSeqNo = seqNoStats.getMaxSeqNo();
handler.accept(globalCheckpoint, maxSeqNo);
} else {
errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId));
}

View File

@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
public class TransportCcrStatsAction extends TransportTasksAction<
ShardFollowNodeTask,
CcrStatsAction.TasksRequest,
CcrStatsAction.TasksResponse, CcrStatsAction.TaskResponse> {
private final IndexNameExpressionResolver resolver;
@Inject
public TransportCcrStatsAction(
final Settings settings,
final ClusterService clusterService,
final TransportService transportService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver resolver) {
super(
settings,
CcrStatsAction.NAME,
clusterService,
transportService,
actionFilters,
CcrStatsAction.TasksRequest::new,
CcrStatsAction.TasksResponse::new,
Ccr.CCR_THREAD_POOL_NAME);
this.resolver = resolver;
}
@Override
protected CcrStatsAction.TasksResponse newResponse(
final CcrStatsAction.TasksRequest request,
final List<CcrStatsAction.TaskResponse> taskResponses,
final List<TaskOperationFailure> taskOperationFailures,
final List<FailedNodeException> failedNodeExceptions) {
return new CcrStatsAction.TasksResponse(taskOperationFailures, failedNodeExceptions, taskResponses);
}
@Override
protected CcrStatsAction.TaskResponse readTaskResponse(final StreamInput in) throws IOException {
return new CcrStatsAction.TaskResponse(in);
}
@Override
protected void processTasks(final CcrStatsAction.TasksRequest request, final Consumer<ShardFollowNodeTask> operation) {
final ClusterState state = clusterService.state();
final Set<String> concreteIndices = new HashSet<>(Arrays.asList(resolver.concreteIndexNames(state, request)));
for (final Task task : taskManager.getTasks().values()) {
if (task instanceof ShardFollowNodeTask) {
final ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
if (concreteIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) {
operation.accept(shardFollowNodeTask);
}
}
}
}
@Override
protected void taskOperation(
final CcrStatsAction.TasksRequest request,
final ShardFollowNodeTask task,
final ActionListener<CcrStatsAction.TaskResponse> listener) {
listener.onResponse(new CcrStatsAction.TaskResponse(task.getFollowShardId(), task.getStatus()));
}
}

View File

@ -16,30 +16,43 @@ public final class BulkShardOperationsResponse extends ReplicationResponse imple
private long globalCheckpoint;
BulkShardOperationsResponse() {
}
public long getGlobalCheckpoint() {
return globalCheckpoint;
}
public void setGlobalCheckpoint(long globalCheckpoint) {
public void setGlobalCheckpoint(final long globalCheckpoint) {
this.globalCheckpoint = globalCheckpoint;
}
private long maxSeqNo;
public long getMaxSeqNo() {
return maxSeqNo;
}
public void setMaxSeqNo(final long maxSeqNo) {
this.maxSeqNo = maxSeqNo;
}
public BulkShardOperationsResponse() {
}
@Override
public void setForcedRefresh(final boolean forcedRefresh) {
}
@Override
public void readFrom(StreamInput in) throws IOException {
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
globalCheckpoint = in.readZLong();
maxSeqNo = in.readZLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(globalCheckpoint);
out.writeZLong(maxSeqNo);
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
@ -144,9 +145,11 @@ public class TransportBulkShardOperationsAction
@Override
public synchronized void respond(ActionListener<BulkShardOperationsResponse> listener) {
// Return a fresh global checkpoint after the operations have been replicated for the shard follow task:
BulkShardOperationsResponse response = finalResponseIfSuccessful;
response.setGlobalCheckpoint(primary.getGlobalCheckpoint());
final BulkShardOperationsResponse response = finalResponseIfSuccessful;
final SeqNoStats seqNoStats = primary.seqNoStats();
// return a fresh global checkpoint after the operations have been replicated for the shard follow task
response.setGlobalCheckpoint(seqNoStats.getGlobalCheckpoint());
response.setMaxSeqNo(seqNoStats.getMaxSeqNo());
listener.onResponse(response);
}

View File

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.rest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ccr.action.CcrStatsAction;
import java.io.IOException;
public class RestCcrStatsAction extends BaseRestHandler {
public RestCcrStatsAction(final Settings settings, final RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, "/_ccr/stats", this);
controller.registerHandler(RestRequest.Method.GET, "/_ccr/stats/{index}", this);
}
@Override
public String getName() {
return "ccr_stats";
}
@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
final CcrStatsAction.TasksRequest request = new CcrStatsAction.TasksRequest();
request.setIndices(Strings.splitStringByCommaToArray(restRequest.param("index")));
request.setIndicesOptions(IndicesOptions.fromRequest(restRequest, request.indicesOptions()));
return channel -> client.execute(CcrStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -3,6 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
@ -520,7 +521,7 @@ public class ShardChangesIT extends ESIntegTestCase {
ShardFollowNodeTask.Status status = (ShardFollowNodeTask.Status) taskInfo.getStatus();
assertThat(status, notNullValue());
assertThat("incorrect global checkpoint " + shardFollowTaskParams,
status.getFollowerGlobalCheckpoint(),
status.followerGlobalCheckpoint(),
equalTo(numDocsPerShard.get(shardFollowTaskParams.getLeaderShardId())));
}
};

View File

@ -14,12 +14,13 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
protected ShardChangesAction.Response createTestInstance() {
final long indexMetadataVersion = randomNonNegativeLong();
final long leaderGlobalCheckpoint = randomNonNegativeLong();
final long leaderMaxSeqNo = randomLongBetween(leaderGlobalCheckpoint, Long.MAX_VALUE);
final int numOps = randomInt(8);
final Translog.Operation[] operations = new Translog.Operation[numOps];
for (int i = 0; i < numOps; i++) {
operations[i] = new Translog.NoOp(i, 0, "test");
}
return new ShardChangesAction.Response(indexMetadataVersion, leaderGlobalCheckpoint, operations);
return new ShardChangesAction.Response(indexMetadataVersion, leaderGlobalCheckpoint, leaderMaxSeqNo, operations);
}
@Override

View File

@ -14,6 +14,7 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -48,19 +49,19 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
}
private void startAndAssertAndStopTask(ShardFollowNodeTask task, TestRun testRun) throws Exception {
task.start(testRun.startSeqNo - 1, testRun.startSeqNo - 1);
task.start(testRun.startSeqNo - 1, testRun.startSeqNo - 1, testRun.startSeqNo - 1, testRun.startSeqNo - 1);
assertBusy(() -> {
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint));
assertThat(status.getIndexMetadataVersion(), equalTo(testRun.finalIndexMetaDataVerion));
assertThat(status.leaderGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint));
assertThat(status.followerGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint));
assertThat(status.indexMetadataVersion(), equalTo(testRun.finalIndexMetaDataVerion));
});
task.markAsCompleted();
assertBusy(() -> {
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(0));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.numberOfConcurrentReads(), equalTo(0));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
});
}
@ -78,7 +79,8 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
};
List<Translog.Operation> receivedOperations = Collections.synchronizedList(new ArrayList<>());
LocalCheckpointTracker tracker = new LocalCheckpointTracker(testRun.startSeqNo - 1, testRun.startSeqNo - 1);
return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) {
return new ShardFollowNodeTask(
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) {
private volatile long indexMetadataVersion = 0L;
private final Map<Long, Integer> fromToSlot = new HashMap<>();
@ -89,15 +91,20 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
}
@Override
protected void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, LongConsumer handler,
Consumer<Exception> errorHandler) {
protected void innerSendBulkShardOperationsRequest(
List<Translog.Operation> operations,
Consumer<BulkShardOperationsResponse> handler,
Consumer<Exception> errorHandler) {
for(Translog.Operation op : operations) {
tracker.markSeqNoAsCompleted(op.seqNo());
}
receivedOperations.addAll(operations);
// Emulate network thread and avoid SO:
threadPool.generic().execute(() -> handler.accept(tracker.getCheckpoint()));
final BulkShardOperationsResponse response = new BulkShardOperationsResponse();
response.setGlobalCheckpoint(tracker.getCheckpoint());
response.setMaxSeqNo(tracker.getMaxSeqNo());
threadPool.generic().execute(() -> handler.accept(response));
}
@Override
@ -129,7 +136,9 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
}
} else {
assert from >= testRun.finalExpectedGlobalCheckpoint;
handler.accept(new ShardChangesAction.Response(0L, tracker.getCheckpoint(), new Translog.Operation[0]));
final long globalCheckpoint = tracker.getCheckpoint();
final long maxSeqNo = tracker.getMaxSeqNo();
handler.accept(new ShardChangesAction.Response(0L,globalCheckpoint, maxSeqNo, new Translog.Operation[0]));
}
};
threadPool.generic().execute(task);
@ -197,7 +206,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
ops.add(new Translog.Index("doc", id, seqNo, 0, source));
}
item.add(new TestResponse(null, indexMetaDataVersion,
new ShardChangesAction.Response(indexMetaDataVersion, nextGlobalCheckPoint, ops.toArray(EMPTY))));
new ShardChangesAction.Response(indexMetaDataVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, ops.toArray(EMPTY))));
responses.put(prevGlobalCheckpoint, item);
} else {
// Simulates a leader shard copy not having all the operations the shard follow task thinks it has by
@ -214,7 +223,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
// Sometimes add an empty shard changes response to also simulate a leader shard lagging behind
if (sometimes()) {
ShardChangesAction.Response response =
new ShardChangesAction.Response(indexMetaDataVersion, prevGlobalCheckpoint, EMPTY);
new ShardChangesAction.Response(indexMetaDataVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, EMPTY);
item.add(new TestResponse(null, indexMetaDataVersion, response));
}
List<Translog.Operation> ops = new ArrayList<>();
@ -225,8 +234,8 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
}
// Report toSeqNo to simulate maxBatchSizeInBytes limit being met or last op to simulate a shard lagging behind:
long localLeaderGCP = randomBoolean() ? ops.get(ops.size() - 1).seqNo() : toSeqNo;
ShardChangesAction.Response response = new ShardChangesAction.Response(indexMetaDataVersion,
localLeaderGCP, ops.toArray(EMPTY));
ShardChangesAction.Response response =
new ShardChangesAction.Response(indexMetaDataVersion, localLeaderGCP, localLeaderGCP, ops.toArray(EMPTY));
item.add(new TestResponse(null, indexMetaDataVersion, response));
responses.put(fromSeqNo, Collections.unmodifiableList(item));
}

View File

@ -3,6 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.io.stream.Writeable;
@ -20,12 +21,31 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
@Override
protected ShardFollowNodeTask.Status createTestInstance() {
return new ShardFollowNodeTask.Status(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), randomNonNegativeLong());
return new ShardFollowNodeTask.Status(
randomInt(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong());
}
@Override
protected Writeable.Reader<ShardFollowNodeTask.Status> instanceReader() {
return ShardFollowNodeTask.Status::new;
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
@ -55,8 +56,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
{6L, 8L}, {14L, 8L}, {22L, 8L}, {30L, 8L}, {38L, 8L}, {46L, 8L}, {54L, 7L}}
));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(7));
assertThat(status.getLastRequestedSeqno(), equalTo(60L));
assertThat(status.numberOfConcurrentReads(), equalTo(7));
assertThat(status.lastRequestedSeqNo(), equalTo(60L));
}
public void testWriteBuffer() {
@ -75,10 +76,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer is full
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(0));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(128L));
assertThat(status.numberOfConcurrentReads(), equalTo(0));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(128L));
}
public void testMaxConcurrentReads() {
@ -91,8 +92,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.get(0)[1], equalTo(8L));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(7L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.lastRequestedSeqNo(), equalTo(7L));
}
public void testTaskCancelled() {
@ -106,7 +107,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
shardChangesRequests.clear();
// The call the updateMapping is a noop, so noting happens.
task.start(128L, task.getStatus().getFollowerGlobalCheckpoint());
task.start(128L, 128L, task.getStatus().followerGlobalCheckpoint(), task.getStatus().followerMaxSeqNo());
task.markAsCompleted();
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(0));
@ -129,11 +130,11 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(0));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(15L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(31L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
assertThat(status.numberOfConcurrentReads(), equalTo(0));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(15L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(31L));
assertThat(status.followerGlobalCheckpoint(), equalTo(-1L));
}
public void testTaskCancelledAfterWriteBufferLimitHasBeenReached() {
@ -153,11 +154,11 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(0));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(128L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
assertThat(status.numberOfConcurrentReads(), equalTo(0));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(128L));
assertThat(status.followerGlobalCheckpoint(), equalTo(-1L));
}
public void testReceiveRetryableError() {
@ -179,10 +180,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(task.isStopped(), equalTo(false));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
public void testReceiveRetryableErrorRetriedTooManyTimes() {
@ -205,10 +206,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(fatalError, notNullValue());
assertThat(fatalError.getMessage(), containsString("retrying failed ["));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
public void testReceiveNonRetryableError() {
@ -226,10 +227,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(task.isStopped(), equalTo(true));
assertThat(fatalError, sameInstance(failure));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
public void testHandleReadResponse() {
@ -244,13 +245,13 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getIndexMetadataVersion(), equalTo(0L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
assertThat(status.indexMetadataVersion(), equalTo(0L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(1));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.followerGlobalCheckpoint(), equalTo(-1L));
}
public void testReceiveLessThanRequested() {
@ -271,10 +272,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.get(0)[1], equalTo(43L));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(1));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
public void testCancelAndReceiveLessThanRequested() {
@ -294,10 +295,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.size(), equalTo(0));
assertThat(bulkShardOperationRequests.size(), equalTo(0));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(0));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.numberOfConcurrentReads(), equalTo(0));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
public void testReceiveNothingExpectedSomething() {
@ -310,18 +311,17 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
shardChangesRequests.clear();
task.innerHandleReadResponse(0L, 63L,
new ShardChangesAction.Response(0, 0, new Translog.Operation[0]));
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, new Translog.Operation[0]));
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
public void testDelayCoordinatesRead() {
@ -343,7 +343,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
// Also invokes coordinateReads()
task.innerHandleReadResponse(0L, 63L, response);
task.innerHandleReadResponse(64L, 63L,
new ShardChangesAction.Response(0, 63L, new Translog.Operation[0]));
new ShardChangesAction.Response(0, 63L, 63L, new Translog.Operation[0]));
assertThat(counter[0], equalTo(1));
}
@ -360,12 +360,12 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getIndexMetadataVersion(), equalTo(1L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
assertThat(status.indexMetadataVersion(), equalTo(1L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(1));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.followerGlobalCheckpoint(), equalTo(-1L));
}
public void testMappingUpdateRetryableError() {
@ -385,11 +385,11 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(task.isStopped(), equalTo(false));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getIndexMetadataVersion(), equalTo(1L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.indexMetadataVersion(), equalTo(1L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(1));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
@ -411,11 +411,11 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(bulkShardOperationRequests.size(), equalTo(0));
assertThat(task.isStopped(), equalTo(true));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getIndexMetadataVersion(), equalTo(0L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.indexMetadataVersion(), equalTo(0L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
public void testMappingUpdateNonRetryableError() {
@ -430,11 +430,11 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(bulkShardOperationRequests.size(), equalTo(0));
assertThat(task.isStopped(), equalTo(true));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getIndexMetadataVersion(), equalTo(0L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.indexMetadataVersion(), equalTo(0L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
public void testCoordinateWrites() {
@ -454,11 +454,11 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(1));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.followerGlobalCheckpoint(), equalTo(-1L));
}
public void testMaxConcurrentWrites() {
@ -472,7 +472,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(bulkShardOperationRequests.get(1), equalTo(Arrays.asList(response.getOperations()).subList(64, 128)));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentWrites(), equalTo(2));
assertThat(status.numberOfConcurrentWrites(), equalTo(2));
task = createShardFollowTask(64, 1, 4, Integer.MAX_VALUE, Long.MAX_VALUE);
response = generateShardChangesResponse(0, 256, 0L, 256L);
@ -486,7 +486,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(bulkShardOperationRequests.get(3), equalTo(Arrays.asList(response.getOperations()).subList(192, 256)));
status = task.getStatus();
assertThat(status.getNumberOfConcurrentWrites(), equalTo(4));
assertThat(status.numberOfConcurrentWrites(), equalTo(4));
}
public void testMaxBatchOperationCount() {
@ -502,7 +502,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentWrites(), equalTo(32));
assertThat(status.numberOfConcurrentWrites(), equalTo(32));
}
public void testRetryableError() {
@ -529,8 +529,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
assertThat(task.isStopped(), equalTo(false));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
assertThat(status.numberOfConcurrentWrites(), equalTo(1));
assertThat(status.followerGlobalCheckpoint(), equalTo(-1L));
}
public void testRetryableErrorRetriedTooManyTimes() {
@ -557,8 +557,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
assertThat(task.isStopped(), equalTo(true));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
assertThat(status.numberOfConcurrentWrites(), equalTo(1));
assertThat(status.followerGlobalCheckpoint(), equalTo(-1L));
}
public void testNonRetryableError() {
@ -579,8 +579,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
assertThat(task.isStopped(), equalTo(true));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
assertThat(status.numberOfConcurrentWrites(), equalTo(1));
assertThat(status.followerGlobalCheckpoint(), equalTo(-1L));
}
public void testMaxBatchBytesLimit() {
@ -623,10 +623,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(63L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.followerGlobalCheckpoint(), equalTo(63L));
}
ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBatches,
@ -643,7 +643,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
mappingUpdateFailures = new LinkedList<>();
imdVersions = new LinkedList<>();
followerGlobalCheckpoints = new LinkedList<>();
return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) {
return new ShardFollowNodeTask(
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) {
@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
@ -660,8 +661,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
@Override
protected void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, LongConsumer handler,
Consumer<Exception> errorHandler) {
protected void innerSendBulkShardOperationsRequest(
final List<Translog.Operation> operations,
final Consumer<BulkShardOperationsResponse> handler,
final Consumer<Exception> errorHandler) {
bulkShardOperationRequests.add(operations);
Exception writeFailure = ShardFollowNodeTaskTests.this.writeFailures.poll();
if (writeFailure != null) {
@ -670,7 +673,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
Long followerGlobalCheckpoint = followerGlobalCheckpoints.poll();
if (followerGlobalCheckpoint != null) {
handler.accept(followerGlobalCheckpoint);
final BulkShardOperationsResponse response = new BulkShardOperationsResponse();
response.setGlobalCheckpoint(followerGlobalCheckpoint);
response.setMaxSeqNo(followerGlobalCheckpoint);
handler.accept(response);
}
}
@ -710,12 +716,13 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
ops.add(new Translog.Index("doc", id, seqNo, 0, source));
}
return new ShardChangesAction.Response(imdVersion, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0]));
return new ShardChangesAction.Response(
imdVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0]));
}
void startTask(ShardFollowNodeTask task, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) {
// The call the updateMapping is a noop, so noting happens.
task.start(leaderGlobalCheckpoint, followerGlobalCheckpoint);
task.start(leaderGlobalCheckpoint, leaderGlobalCheckpoint, followerGlobalCheckpoint, followerGlobalCheckpoint);
}

View File

@ -21,6 +21,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine.Operation.Origin;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
@ -53,7 +54,13 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
leaderGroup.assertAllEqual(docCount);
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
shardFollowTask.start(leaderGroup.getPrimary().getGlobalCheckpoint(), followerGroup.getPrimary().getGlobalCheckpoint());
final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(
leaderSeqNoStats.getGlobalCheckpoint(),
leaderSeqNoStats.getMaxSeqNo(),
followerSeqNoStats.getGlobalCheckpoint(),
followerSeqNoStats.getMaxSeqNo());
docCount += leaderGroup.appendDocs(randomInt(128));
leaderGroup.syncGlobalCheckpoint();
leaderGroup.assertAllEqual(docCount);
@ -84,7 +91,13 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
leaderGroup.startAll();
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
shardFollowTask.start(leaderGroup.getPrimary().getGlobalCheckpoint(), followerGroup.getPrimary().getGlobalCheckpoint());
final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(
leaderSeqNoStats.getGlobalCheckpoint(),
leaderSeqNoStats.getMaxSeqNo(),
followerSeqNoStats.getGlobalCheckpoint(),
followerSeqNoStats.getMaxSeqNo());
int docCount = 256;
leaderGroup.appendDocs(1);
Runnable task = () -> {
@ -151,7 +164,8 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
AtomicBoolean stopped = new AtomicBoolean(false);
LongSet fetchOperations = new LongHashSet();
return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) {
return new ShardFollowNodeTask(
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) {
@Override
protected synchronized void onOperationsFetched(Translog.Operation[] operations) {
super.onOperationsFetched(operations);
@ -169,12 +183,13 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
}
@Override
protected void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, LongConsumer handler,
Consumer<Exception> errorHandler) {
protected void innerSendBulkShardOperationsRequest(
final List<Translog.Operation> operations,
final Consumer<BulkShardOperationsResponse> handler,
final Consumer<Exception> errorHandler) {
Runnable task = () -> {
BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations);
ActionListener<BulkShardOperationsResponse> listener =
ActionListener.wrap(r -> handler.accept(r.getGlobalCheckpoint()), errorHandler);
ActionListener<BulkShardOperationsResponse> listener = ActionListener.wrap(handler::accept, errorHandler);
new CCRAction(request, listener, followerGroup).execute();
};
threadPool.executor(ThreadPool.Names.GENERIC).execute(task);
@ -190,12 +205,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
Exception exception = null;
for (IndexShard indexShard : indexShards) {
long globalCheckpoint = indexShard.getGlobalCheckpoint();
final SeqNoStats seqNoStats = indexShard.seqNoStats();
try {
Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, globalCheckpoint, from,
Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from,
maxOperationCount, params.getMaxBatchSizeInBytes());
// Hard code index metadata version, this is ok, as mapping updates are not tested here.
handler.accept(new ShardChangesAction.Response(1L, globalCheckpoint, ops));
final ShardChangesAction.Response response =
new ShardChangesAction.Response(1L, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), ops);
handler.accept(response);
return;
} catch (Exception e) {
exception = e;

View File

@ -0,0 +1,16 @@
{
"ccr.stats": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
"methods": [ "GET" ],
"url": {
"path": "/_ccr/stats",
"paths": [ "/_ccr/stats", "/_ccr/stats/{index}" ],
"parts": {
"index": {
"type": "list",
"description": "A comma-separated list of index names; use `_all` or empty string to perform the operation on all indices"
}
}
}
}
}

View File

@ -0,0 +1,51 @@
---
"Test stats":
- do:
indices.create:
index: foo
body:
settings:
index:
soft_deletes:
enabled: true
mappings:
doc:
properties:
field:
type: keyword
- do:
ccr.create_and_follow_index:
index: bar
body:
leader_index: foo
# we can not reliably wait for replication to occur so we test the endpoint without indexing any documents
- do:
ccr.stats:
index: bar
- match: { bar.0.shard_id: 0 }
- match: { bar.0.leader_global_checkpoint: -1 }
- match: { bar.0.leader_max_seq_no: -1 }
- match: { bar.0.follower_global_checkpoint: -1 }
- match: { bar.0.follower_max_seq_no: -1 }
- match: { bar.0.last_requested_seq_no: -1 }
- match: { bar.0.number_of_concurrent_reads: 0 }
- match: { bar.0.number_of_concurrent_writes: 0 }
- match: { bar.0.number_of_queued_writes: 0 }
- match: { bar.0.index_metadata_version: 0 }
- match: { bar.0.total_fetch_time_millis: 0 }
- match: { bar.0.number_of_successful_fetches: 0 }
- match: { bar.0.number_of_failed_fetches: 0 }
- match: { bar.0.operations_received: 0 }
- match: { bar.0.total_transferred_bytes: 0 }
- match: { bar.0.total_index_time_millis: 0 }
- match: { bar.0.number_of_successful_bulk_operations: 0 }
- match: { bar.0.number_of_failed_bulk_operations: 0 }
- match: { bar.0.number_of_operations_indexed: 0 }
- do:
ccr.unfollow_index:
index: bar
- is_true: acknowledged