diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 1a1fd4cffa5..c5dd6f861fc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -36,6 +36,7 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; +import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor; import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction; @@ -112,8 +113,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E ShardFollowTask::new), // Task statuses - new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowTask.NAME, - ShardFollowTask.Status::new) + new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.NAME, + ShardFollowNodeTask.Status::new) ); } @@ -124,8 +125,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E ShardFollowTask::fromXContent), // Task statuses - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(ShardFollowTask.NAME), - ShardFollowTask.Status::fromXContent) + new NamedXContentRegistry.Entry(ShardFollowNodeTask.Status.class, new ParseField(ShardFollowNodeTask.Status.NAME), + ShardFollowNodeTask.Status::fromXContent) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java new file mode 100644 index 00000000000..9d1fe5d67df --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + +public class ShardFollowNodeTask extends AllocatedPersistentTask { + + private final AtomicLong processedGlobalCheckpoint = new AtomicLong(); + + ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { + super(id, type, action, description, parentTask, headers); + } + + void updateProcessedGlobalCheckpoint(long processedGlobalCheckpoint) { + this.processedGlobalCheckpoint.set(processedGlobalCheckpoint); + } + + @Override + public Task.Status getStatus() { + return new Status(processedGlobalCheckpoint.get()); + } + + public static class Status implements Task.Status { + + public static final String NAME = "shard-follow-node-task-status"; + + static final ParseField PROCESSED_GLOBAL_CHECKPOINT_FIELD = new ParseField("processed_global_checkpoint"); + + static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>(NAME, args -> new Status((Long) args[0])); + + static { + PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSED_GLOBAL_CHECKPOINT_FIELD); + } + + private final long processedGlobalCheckpoint; + + Status(long processedGlobalCheckpoint) { + this.processedGlobalCheckpoint = processedGlobalCheckpoint; + } + + public Status(StreamInput in) throws IOException { + this.processedGlobalCheckpoint = in.readVLong(); + } + + public long getProcessedGlobalCheckpoint() { + return processedGlobalCheckpoint; + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(processedGlobalCheckpoint); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(PROCESSED_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), processedGlobalCheckpoint); + } + builder.endObject(); + return builder; + } + + public static Status fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return processedGlobalCheckpoint == status.processedGlobalCheckpoint; + } + + @Override + public int hashCode() { + return Objects.hash(processedGlobalCheckpoint); + } + + public String toString() { + return Strings.toString(this); + } + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index b5a2ba2e031..e153bb2ec56 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -147,69 +147,4 @@ public class ShardFollowTask implements PersistentTaskParams { return Strings.toString(this); } - public static class Status implements Task.Status { - - static final ParseField PROCESSED_GLOBAL_CHECKPOINT_FIELD = new ParseField("processed_global_checkpoint"); - - static final ObjectParser PARSER = new ObjectParser<>(NAME, Status::new); - - static { - PARSER.declareLong(Status::setProcessedGlobalCheckpoint, PROCESSED_GLOBAL_CHECKPOINT_FIELD); - } - - private long processedGlobalCheckpoint; - - public Status() { - } - - public Status(StreamInput in) throws IOException { - processedGlobalCheckpoint = in.readZLong(); - } - - public long getProcessedGlobalCheckpoint() { - return processedGlobalCheckpoint; - } - - public void setProcessedGlobalCheckpoint(long processedGlobalCheckpoint) { - this.processedGlobalCheckpoint = processedGlobalCheckpoint; - } - - @Override - public String getWriteableName() { - return NAME; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeZLong(processedGlobalCheckpoint); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(PROCESSED_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), processedGlobalCheckpoint); - return builder.endObject(); - } - - public static Status fromXContent(XContentParser parser) { - return PARSER.apply(parser, null); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Status status = (Status) o; - return processedGlobalCheckpoint == status.processedGlobalCheckpoint; - } - - @Override - public int hashCode() { - return Objects.hash(processedGlobalCheckpoint); - } - - public String toString() { - return Strings.toString(this); - } - } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index dbef0c150cd..2cfc836363a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; @@ -20,10 +21,11 @@ import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; @@ -33,6 +35,7 @@ import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTasksExecutor; import java.util.Arrays; +import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -40,6 +43,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.LongConsumer; public class ShardFollowTasksExecutor extends PersistentTasksExecutor { @@ -72,19 +76,21 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor taskInProgress, + Map headers) { + return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers); } - void prepare(AllocatedPersistentTask task, ShardFollowTask params, long followGlobalCheckPoint) { + @Override + protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask params, Task.Status status) { + ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task; + logger.info("Starting shard following [{}]", params); + fetchGlobalCheckpoint(params.getFollowShardId(), + followGlobalCheckPoint -> prepare(shardFollowNodeTask, params, followGlobalCheckPoint), task::markAsFailed); + } + + void prepare(ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) { if (task.getState() != AllocatedPersistentTask.State.STARTED) { // TODO: need better cancellation control return; @@ -92,52 +98,31 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { - Optional leaderShardStats = Arrays.stream(r.getIndex(leaderShard.getIndexName()).getShards()) - .filter(shardStats -> shardStats.getShardRouting().shardId().equals(leaderShard)) - .filter(shardStats -> shardStats.getShardRouting().primary()) - .findAny(); - - if (leaderShardStats.isPresent()) { - // Threat -1 as 0. If no indexing has happened in leader shard then global checkpoint is -1. - final long leaderGlobalCheckPoint = Math.max(0, leaderShardStats.get().getSeqNoStats().getGlobalCheckpoint()); - // TODO: check if both indices have the same history uuid - if (leaderGlobalCheckPoint == followGlobalCheckPoint) { - retry(task, params, followGlobalCheckPoint); - } else { - assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint + - "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]"; - Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME); - Consumer handler = e -> { - if (e == null) { - ShardFollowTask.Status newStatus = new ShardFollowTask.Status(); - newStatus.setProcessedGlobalCheckpoint(leaderGlobalCheckPoint); - task.updatePersistentStatus(newStatus, ActionListener.wrap( - persistentTask -> logger.debug("[{}] Successfully updated global checkpoint to {}", - leaderShard, leaderGlobalCheckPoint), - updateStatusException -> { - logger.error(new ParameterizedMessage("[{}] Failed to update global checkpoint to {}", - leaderShard, leaderGlobalCheckPoint), updateStatusException); - task.markAsFailed(updateStatusException); - } - )); - prepare(task, params, leaderGlobalCheckPoint); - } else { - task.markAsFailed(e); - } - }; - ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, params.getMaxChunkSize(), - params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, handler); - coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint); - coordinator.start(); - } + fetchGlobalCheckpoint(leaderShard, leaderGlobalCheckPoint -> { + // TODO: check if both indices have the same history uuid + if (leaderGlobalCheckPoint == followGlobalCheckPoint) { + retry(task, params, followGlobalCheckPoint); } else { - task.markAsFailed(new IllegalArgumentException("Cannot find shard stats for primary leader shard")); + assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint + + "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]"; + Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME); + Consumer handler = e -> { + if (e == null) { + task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint); + prepare(task, params, leaderGlobalCheckPoint); + } else { + task.markAsFailed(e); + } + }; + ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, params.getMaxChunkSize(), + params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, handler); + coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint); + coordinator.start(); } - }, task::markAsFailed)); + }, task::markAsFailed); } - private void retry(AllocatedPersistentTask task, ShardFollowTask params, long followGlobalCheckPoint) { + private void retry(ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) { threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -151,6 +136,24 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor errorHandler) { + client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> { + IndexStats indexStats = r.getIndex(shardId.getIndexName()); + Optional filteredShardStats = Arrays.stream(indexStats.getShards()) + .filter(shardStats -> shardStats.getShardRouting().shardId().equals(shardId)) + .filter(shardStats -> shardStats.getShardRouting().primary()) + .findAny(); + + if (filteredShardStats.isPresent()) { + // Treat -1 as 0. If no indexing has happened in leader shard then global checkpoint is -1. + final long globalCheckPoint = Math.max(0, filteredShardStats.get().getSeqNoStats().getGlobalCheckpoint()); + handler.accept(globalCheckPoint); + } else { + errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId)); + } + }, errorHandler)); + } + static class ChunksCoordinator { private static final Logger LOGGER = Loggers.getLogger(ChunksCoordinator.class); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 0ea0b5c8295..3e781187301 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -23,6 +23,8 @@ */ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; @@ -36,10 +38,12 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; +import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction; import org.elasticsearch.xpack.core.XPackSettings; @@ -49,6 +53,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; @@ -243,9 +248,28 @@ public class ShardChangesIT extends ESIntegTestCase { final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards)); + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setDetailed(true); + listTasksRequest.setActions(ShardFollowTask.NAME + "[c]"); + ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).actionGet(); + assertThat(listTasksResponse.getNodeFailures().size(), equalTo(0)); + assertThat(listTasksResponse.getTaskFailures().size(), equalTo(0)); + + List taskInfos = listTasksResponse.getTasks(); + assertThat(taskInfos.size(), equalTo(numberOfPrimaryShards)); for (PersistentTasksCustomMetaData.PersistentTask task : tasks.tasks()) { final ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams(); - final ShardFollowTask.Status status = (ShardFollowTask.Status) task.getStatus(); + + TaskInfo taskInfo = null; + String expectedId = "id=" + task.getId(); + for (TaskInfo info : taskInfos) { + if (expectedId.equals(info.getDescription())) { + taskInfo = info; + break; + } + } + assertThat(taskInfo, notNullValue()); + ShardFollowNodeTask.Status status = (ShardFollowNodeTask.Status) taskInfo.getStatus(); assertThat(status, notNullValue()); assertThat( status.getProcessedGlobalCheckpoint(), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java new file mode 100644 index 00000000000..69787998fbb --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase { + + @Override + protected ShardFollowNodeTask.Status doParseInstance(XContentParser parser) throws IOException { + return ShardFollowNodeTask.Status.fromXContent(parser); + } + + @Override + protected ShardFollowNodeTask.Status createTestInstance() { + return new ShardFollowNodeTask.Status(randomNonNegativeLong()); + } + + @Override + protected Writeable.Reader instanceReader() { + return ShardFollowNodeTask.Status::new; + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskStatusTests.java deleted file mode 100644 index 7013c8058f9..00000000000 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskStatusTests.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ccr.action; - -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.test.AbstractSerializingTestCase; - -import java.io.IOException; - -public class ShardFollowTaskStatusTests extends AbstractSerializingTestCase { - - @Override - protected ShardFollowTask.Status doParseInstance(XContentParser parser) throws IOException { - return ShardFollowTask.Status.fromXContent(parser); - } - - @Override - protected ShardFollowTask.Status createTestInstance() { - ShardFollowTask.Status status = new ShardFollowTask.Status(); - status.setProcessedGlobalCheckpoint(randomNonNegativeLong()); - return status; - } - - @Override - protected Writeable.Reader instanceReader() { - return ShardFollowTask.Status::new; - } -}