ccr: use indices stats api to fetch global checkpoint of the follower shards and

keep track of shard follow stats inside shard follow stats' node task instead of persistent task status.

By maintaining the shard follow stats inside its node task the stats update is quicker as
no cluster state update is required. The stats are now transient; meaning if the task
is going to run a different node then the stats are gone too. Currently only the processed
global checkpoint is being tracked and this is being restored when a shard follow node task
starts via the indices stats api (the reason of the first change of this change). Other stats
that we may add in the future (like fetch_time, see: https://gist.github.com/s1monw/dba13daf8493bf48431b72365e110717)
it is ok if we start from zero in case a shard follow task moves to another node.
This commit is contained in:
Martijn van Groningen 2018-04-04 11:09:40 +02:00
parent d976fa44e7
commit d77f756f5c
7 changed files with 227 additions and 156 deletions

View File

@ -36,6 +36,7 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction;
@ -112,8 +113,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
ShardFollowTask::new),
// Task statuses
new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowTask.NAME,
ShardFollowTask.Status::new)
new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.NAME,
ShardFollowNodeTask.Status::new)
);
}
@ -124,8 +125,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
ShardFollowTask::fromXContent),
// Task statuses
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(ShardFollowTask.NAME),
ShardFollowTask.Status::fromXContent)
new NamedXContentRegistry.Entry(ShardFollowNodeTask.Status.class, new ParseField(ShardFollowNodeTask.Status.NAME),
ShardFollowNodeTask.Status::fromXContent)
);
}

View File

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
public class ShardFollowNodeTask extends AllocatedPersistentTask {
private final AtomicLong processedGlobalCheckpoint = new AtomicLong();
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
super(id, type, action, description, parentTask, headers);
}
void updateProcessedGlobalCheckpoint(long processedGlobalCheckpoint) {
this.processedGlobalCheckpoint.set(processedGlobalCheckpoint);
}
@Override
public Task.Status getStatus() {
return new Status(processedGlobalCheckpoint.get());
}
public static class Status implements Task.Status {
public static final String NAME = "shard-follow-node-task-status";
static final ParseField PROCESSED_GLOBAL_CHECKPOINT_FIELD = new ParseField("processed_global_checkpoint");
static final ConstructingObjectParser<Status, Void> PARSER =
new ConstructingObjectParser<>(NAME, args -> new Status((Long) args[0]));
static {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSED_GLOBAL_CHECKPOINT_FIELD);
}
private final long processedGlobalCheckpoint;
Status(long processedGlobalCheckpoint) {
this.processedGlobalCheckpoint = processedGlobalCheckpoint;
}
public Status(StreamInput in) throws IOException {
this.processedGlobalCheckpoint = in.readVLong();
}
public long getProcessedGlobalCheckpoint() {
return processedGlobalCheckpoint;
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(processedGlobalCheckpoint);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field(PROCESSED_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), processedGlobalCheckpoint);
}
builder.endObject();
return builder;
}
public static Status fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
return processedGlobalCheckpoint == status.processedGlobalCheckpoint;
}
@Override
public int hashCode() {
return Objects.hash(processedGlobalCheckpoint);
}
public String toString() {
return Strings.toString(this);
}
}
}

View File

@ -147,69 +147,4 @@ public class ShardFollowTask implements PersistentTaskParams {
return Strings.toString(this);
}
public static class Status implements Task.Status {
static final ParseField PROCESSED_GLOBAL_CHECKPOINT_FIELD = new ParseField("processed_global_checkpoint");
static final ObjectParser<Status, Void> PARSER = new ObjectParser<>(NAME, Status::new);
static {
PARSER.declareLong(Status::setProcessedGlobalCheckpoint, PROCESSED_GLOBAL_CHECKPOINT_FIELD);
}
private long processedGlobalCheckpoint;
public Status() {
}
public Status(StreamInput in) throws IOException {
processedGlobalCheckpoint = in.readZLong();
}
public long getProcessedGlobalCheckpoint() {
return processedGlobalCheckpoint;
}
public void setProcessedGlobalCheckpoint(long processedGlobalCheckpoint) {
this.processedGlobalCheckpoint = processedGlobalCheckpoint;
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeZLong(processedGlobalCheckpoint);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(PROCESSED_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), processedGlobalCheckpoint);
return builder.endObject();
}
public static Status fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
return processedGlobalCheckpoint == status.processedGlobalCheckpoint;
}
@Override
public int hashCode() {
return Objects.hash(processedGlobalCheckpoint);
}
public String toString() {
return Strings.toString(this);
}
}
}

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
@ -20,10 +21,11 @@ import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
@ -33,6 +35,7 @@ import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -40,6 +43,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollowTask> {
@ -72,19 +76,21 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
}
@Override
protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask params, Task.Status status) {
final ShardFollowTask.Status shardFollowStatus = (ShardFollowTask.Status) status;
final long followGlobalCheckPoint;
if (shardFollowStatus != null) {
followGlobalCheckPoint = shardFollowStatus.getProcessedGlobalCheckpoint();
} else {
followGlobalCheckPoint = 0;
}
logger.info("Starting shard following [{}]", params);
prepare(task, params, followGlobalCheckPoint);
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> taskInProgress,
Map<String, String> headers) {
return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers);
}
void prepare(AllocatedPersistentTask task, ShardFollowTask params, long followGlobalCheckPoint) {
@Override
protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask params, Task.Status status) {
ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
logger.info("Starting shard following [{}]", params);
fetchGlobalCheckpoint(params.getFollowShardId(),
followGlobalCheckPoint -> prepare(shardFollowNodeTask, params, followGlobalCheckPoint), task::markAsFailed);
}
void prepare(ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) {
if (task.getState() != AllocatedPersistentTask.State.STARTED) {
// TODO: need better cancellation control
return;
@ -92,52 +98,31 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
final ShardId leaderShard = params.getLeaderShardId();
final ShardId followerShard = params.getFollowShardId();
client.admin().indices().stats(new IndicesStatsRequest().indices(leaderShard.getIndexName()), ActionListener.wrap(r -> {
Optional<ShardStats> leaderShardStats = Arrays.stream(r.getIndex(leaderShard.getIndexName()).getShards())
.filter(shardStats -> shardStats.getShardRouting().shardId().equals(leaderShard))
.filter(shardStats -> shardStats.getShardRouting().primary())
.findAny();
if (leaderShardStats.isPresent()) {
// Threat -1 as 0. If no indexing has happened in leader shard then global checkpoint is -1.
final long leaderGlobalCheckPoint = Math.max(0, leaderShardStats.get().getSeqNoStats().getGlobalCheckpoint());
// TODO: check if both indices have the same history uuid
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
retry(task, params, followGlobalCheckPoint);
} else {
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint +
"] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]";
Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME);
Consumer<Exception> handler = e -> {
if (e == null) {
ShardFollowTask.Status newStatus = new ShardFollowTask.Status();
newStatus.setProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
task.updatePersistentStatus(newStatus, ActionListener.wrap(
persistentTask -> logger.debug("[{}] Successfully updated global checkpoint to {}",
leaderShard, leaderGlobalCheckPoint),
updateStatusException -> {
logger.error(new ParameterizedMessage("[{}] Failed to update global checkpoint to {}",
leaderShard, leaderGlobalCheckPoint), updateStatusException);
task.markAsFailed(updateStatusException);
}
));
prepare(task, params, leaderGlobalCheckPoint);
} else {
task.markAsFailed(e);
}
};
ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, params.getMaxChunkSize(),
params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, handler);
coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint);
coordinator.start();
}
fetchGlobalCheckpoint(leaderShard, leaderGlobalCheckPoint -> {
// TODO: check if both indices have the same history uuid
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
retry(task, params, followGlobalCheckPoint);
} else {
task.markAsFailed(new IllegalArgumentException("Cannot find shard stats for primary leader shard"));
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint +
"] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]";
Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME);
Consumer<Exception> handler = e -> {
if (e == null) {
task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
prepare(task, params, leaderGlobalCheckPoint);
} else {
task.markAsFailed(e);
}
};
ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, params.getMaxChunkSize(),
params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, handler);
coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint);
coordinator.start();
}
}, task::markAsFailed));
}, task::markAsFailed);
}
private void retry(AllocatedPersistentTask task, ShardFollowTask params, long followGlobalCheckPoint) {
private void retry(ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) {
threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
@ -151,6 +136,24 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
});
}
private void fetchGlobalCheckpoint(ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler) {
client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
IndexStats indexStats = r.getIndex(shardId.getIndexName());
Optional<ShardStats> filteredShardStats = Arrays.stream(indexStats.getShards())
.filter(shardStats -> shardStats.getShardRouting().shardId().equals(shardId))
.filter(shardStats -> shardStats.getShardRouting().primary())
.findAny();
if (filteredShardStats.isPresent()) {
// Treat -1 as 0. If no indexing has happened in leader shard then global checkpoint is -1.
final long globalCheckPoint = Math.max(0, filteredShardStats.get().getSeqNoStats().getGlobalCheckpoint());
handler.accept(globalCheckPoint);
} else {
errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId));
}
}, errorHandler));
}
static class ChunksCoordinator {
private static final Logger LOGGER = Loggers.getLogger(ChunksCoordinator.class);

View File

@ -23,6 +23,8 @@
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
@ -36,10 +38,12 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction;
import org.elasticsearch.xpack.core.XPackSettings;
@ -49,6 +53,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -243,9 +248,28 @@ public class ShardChangesIT extends ESIntegTestCase {
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards));
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
listTasksRequest.setActions(ShardFollowTask.NAME + "[c]");
ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).actionGet();
assertThat(listTasksResponse.getNodeFailures().size(), equalTo(0));
assertThat(listTasksResponse.getTaskFailures().size(), equalTo(0));
List<TaskInfo> taskInfos = listTasksResponse.getTasks();
assertThat(taskInfos.size(), equalTo(numberOfPrimaryShards));
for (PersistentTasksCustomMetaData.PersistentTask<?> task : tasks.tasks()) {
final ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams();
final ShardFollowTask.Status status = (ShardFollowTask.Status) task.getStatus();
TaskInfo taskInfo = null;
String expectedId = "id=" + task.getId();
for (TaskInfo info : taskInfos) {
if (expectedId.equals(info.getDescription())) {
taskInfo = info;
break;
}
}
assertThat(taskInfo, notNullValue());
ShardFollowNodeTask.Status status = (ShardFollowNodeTask.Status) taskInfo.getStatus();
assertThat(status, notNullValue());
assertThat(
status.getProcessedGlobalCheckpoint(),

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<ShardFollowNodeTask.Status> {
@Override
protected ShardFollowNodeTask.Status doParseInstance(XContentParser parser) throws IOException {
return ShardFollowNodeTask.Status.fromXContent(parser);
}
@Override
protected ShardFollowNodeTask.Status createTestInstance() {
return new ShardFollowNodeTask.Status(randomNonNegativeLong());
}
@Override
protected Writeable.Reader<ShardFollowNodeTask.Status> instanceReader() {
return ShardFollowNodeTask.Status::new;
}
}

View File

@ -1,32 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
public class ShardFollowTaskStatusTests extends AbstractSerializingTestCase<ShardFollowTask.Status> {
@Override
protected ShardFollowTask.Status doParseInstance(XContentParser parser) throws IOException {
return ShardFollowTask.Status.fromXContent(parser);
}
@Override
protected ShardFollowTask.Status createTestInstance() {
ShardFollowTask.Status status = new ShardFollowTask.Status();
status.setProcessedGlobalCheckpoint(randomNonNegativeLong());
return status;
}
@Override
protected Writeable.Reader<ShardFollowTask.Status> instanceReader() {
return ShardFollowTask.Status::new;
}
}