diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 5999cc11478..d31aa533b50 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -7,15 +7,35 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.plugins.ActionPlugin.ActionHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ExecutorBuilder; +import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; +import org.elasticsearch.xpack.ccr.action.ShardFollowTask; +import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor; +import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; +import org.elasticsearch.xpack.ccr.rest.RestFollowExistingIndexAction; +import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction; +import org.elasticsearch.xpack.persistent.PersistentTaskParams; +import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; +import org.elasticsearch.xpack.security.InternalClient; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -29,7 +49,10 @@ import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTIN */ public final class Ccr { + public static final String CCR_THREAD_POOL_NAME = "ccr"; + private final boolean enabled; + private final Settings settings; private final boolean tribeNode; private final boolean tribeNodeClient; @@ -39,17 +62,57 @@ public final class Ccr { * @param settings the settings */ public Ccr(final Settings settings) { + this.settings = settings; this.enabled = CCR_ENABLED_SETTING.get(settings); this.tribeNode = XPackPlugin.isTribeNode(settings); this.tribeNodeClient = XPackPlugin.isTribeClientNode(settings); } + public List> createPersistentTasksExecutors(InternalClient internalClient, ThreadPool threadPool) { + return Collections.singletonList(new ShardFollowTasksExecutor(settings, internalClient, threadPool)); + } + public List> getActions() { - if (!enabled || tribeNodeClient || tribeNode) { + if (enabled == false || tribeNodeClient || tribeNode) { return emptyList(); } - return Collections.singletonList(new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class)); + return Arrays.asList( + new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class), + new ActionHandler<>(FollowExistingIndexAction.INSTANCE, FollowExistingIndexAction.TransportAction.class), + new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class) + ); + } + + public List getRestHandlers(Settings settings, RestController restController) { + return Arrays.asList( + new RestUnfollowIndexAction(settings, restController), + new RestFollowExistingIndexAction(settings, restController) + ); + } + + public List getNamedWriteables() { + return Arrays.asList( + // Persistent action requests + new NamedWriteableRegistry.Entry(PersistentTaskParams.class, ShardFollowTask.NAME, + ShardFollowTask::new), + + // Task statuses + new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowTask.NAME, + ShardFollowTask.Status::new) + ); + } + + public List getNamedXContent() { + return Arrays.asList( + // Persistent action requests + new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(ShardFollowTask.NAME), + ShardFollowTask::fromXContent), + + // Task statuses + new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(ShardFollowTask.NAME), + ShardFollowTask.Status::fromXContent) + ); } /** @@ -75,4 +138,15 @@ public final class Ccr { } } + public List> getExecutorBuilders(Settings settings) { + if (enabled == false || tribeNode || tribeNodeClient) { + return Collections.emptyList(); + } + + FixedExecutorBuilder ccrTp = new FixedExecutorBuilder(settings, CCR_THREAD_POOL_NAME, + 32, 100, "xpack.ccr.ccr_thread_pool"); + + return Collections.singletonList(ccrTp); + } + } diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java new file mode 100644 index 00000000000..ea711fb4e6d --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java @@ -0,0 +1,207 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.persistent.PersistentTasksService; +import org.elasticsearch.xpack.security.InternalClient; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; + +public class FollowExistingIndexAction extends Action { + + public static final FollowExistingIndexAction INSTANCE = new FollowExistingIndexAction(); + public static final String NAME = "cluster:admin/xpack/ccr/follow_existing_index"; + + private FollowExistingIndexAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends ActionRequest { + + private String leaderIndex; + private String followIndex; + + public String getLeaderIndex() { + return leaderIndex; + } + + public void setLeaderIndex(String leaderIndex) { + this.leaderIndex = leaderIndex; + } + + public String getFollowIndex() { + return followIndex; + } + + public void setFollowIndex(String followIndex) { + this.followIndex = followIndex; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + leaderIndex = in.readString(); + followIndex = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(leaderIndex); + out.writeString(followIndex); + } + } + + public static class RequestBuilder extends ActionRequestBuilder { + + public RequestBuilder(ElasticsearchClient client, Action action) { + super(client, action, new Request()); + } + } + + public static class Response extends ActionResponse { + + } + + public static class TransportAction extends HandledTransportAction { + + private final InternalClient client; + private final PersistentTasksService persistentTasksService; + + @Inject + public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, InternalClient client, + PersistentTasksService persistentTasksService) { + super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); + this.client = client; + this.persistentTasksService = persistentTasksService; + } + + + @Override + protected void doExecute(Request request, ActionListener listener) { + client.admin().cluster().state(new ClusterStateRequest(), new ActionListener() { + @Override + public void onResponse(ClusterStateResponse clusterStateResponse) { + IndexMetaData leaderIndexMetadata = clusterStateResponse.getState().getMetaData() + .index(request.leaderIndex); + if (leaderIndexMetadata == null) { + listener.onFailure(new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist")); + } + + IndexMetaData followIndexMetadata = clusterStateResponse.getState().getMetaData() + .index(request.followIndex); + if (followIndexMetadata == null) { + listener.onFailure(new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist")); + } + + if (leaderIndexMetadata.getNumberOfShards() != followIndexMetadata.getNumberOfShards()) { + listener.onFailure(new IllegalArgumentException("leader index primary shards [" + + leaderIndexMetadata.getNumberOfShards() + "] does not match with the number of " + + "shards of the follow index [" + followIndexMetadata.getNumberOfShards() + "]")); + return; + } + + // TODO: other validation checks + + final int numShards = followIndexMetadata.getNumberOfShards(); + final AtomicInteger counter = new AtomicInteger(numShards); + final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); + for (int i = 0; i < numShards; i++) { + final int shardId = i; + String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; + ShardFollowTask shardFollowTask = new ShardFollowTask(new ShardId(followIndexMetadata.getIndex(), shardId), + new ShardId(leaderIndexMetadata.getIndex(), shardId)); + persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask, + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + responses.set(shardId, task); + finalizeResponse(); + } + + @Override + public void onFailure(Exception e) { + responses.set(shardId, e); + finalizeResponse(); + } + + void finalizeResponse() { + Exception error = null; + if (counter.decrementAndGet() == 0) { + for (int j = 0; j < responses.length(); j++) { + Object response = responses.get(j); + if (response instanceof Exception) { + if (error == null) { + error = (Exception) response; + } else { + error.addSuppressed((Throwable) response); + } + } + } + + if (error == null) { + // include task ids? + listener.onResponse(new Response()); + } else { + // TODO: cancel all started tasks + listener.onFailure(error); + } + } + } + + }); + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + } + +} diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java new file mode 100644 index 00000000000..073bf7b6cfe --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -0,0 +1,178 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xpack.persistent.PersistentTaskParams; + +import java.io.IOException; +import java.util.Objects; + +public class ShardFollowTask implements PersistentTaskParams { + + public static final String NAME = "shard_follow"; + + static final ParseField FOLLOW_SHARD_INDEX_FIELD = new ParseField("follow_shard_index"); + static final ParseField FOLLOW_SHARD_INDEX_UUID_FIELD = new ParseField("follow_shard_index_uuid"); + static final ParseField FOLLOW_SHARD_SHARDID_FIELD = new ParseField("follow_shard_shard"); + static final ParseField LEADER_SHARD_INDEX_FIELD = new ParseField("leader_shard_index"); + static final ParseField LEADER_SHARD_INDEX_UUID_FIELD = new ParseField("leader_shard_index_uuid"); + static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard"); + + public static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + (a) -> new ShardFollowTask(new ShardId((String) a[0], (String) a[1], (int) a[2]), + new ShardId((String) a[3], (String) a[4], (int) a[5]))); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_UUID_FIELD); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_SHARDID_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_UUID_FIELD); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), LEADER_SHARD_SHARDID_FIELD); + } + + public ShardFollowTask(ShardId followShardId, ShardId leaderShardId) { + this.followShardId = followShardId; + this.leaderShardId = leaderShardId; + } + + public ShardFollowTask(StreamInput in) throws IOException { + this.followShardId = ShardId.readShardId(in); + this.leaderShardId = ShardId.readShardId(in); + } + + public static ShardFollowTask fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + private final ShardId followShardId; + private final ShardId leaderShardId; + + public ShardId getFollowShardId() { + return followShardId; + } + + public ShardId getLeaderShardId() { + return leaderShardId; + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + followShardId.writeTo(out); + leaderShardId.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(FOLLOW_SHARD_INDEX_FIELD.getPreferredName(), followShardId.getIndex().getName()); + builder.field(FOLLOW_SHARD_INDEX_UUID_FIELD.getPreferredName(), followShardId.getIndex().getUUID()); + builder.field(FOLLOW_SHARD_SHARDID_FIELD.getPreferredName(), followShardId.id()); + builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName()); + builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID()); + builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id()); + return builder.endObject(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ShardFollowTask that = (ShardFollowTask) o; + return Objects.equals(followShardId, that.followShardId) && + Objects.equals(leaderShardId, that.leaderShardId); + } + + @Override + public int hashCode() { + return Objects.hash(followShardId, leaderShardId); + } + + public String toString() { + return Strings.toString(this); + } + + public static class Status implements Task.Status { + + static final ParseField PROCESSED_GLOBAL_CHECKPOINT_FIELD = new ParseField("processed_global_checkpoint"); + + static final ObjectParser PARSER = new ObjectParser<>(NAME, Status::new); + + static { + PARSER.declareLong(Status::setProcessedGlobalCheckpoint, PROCESSED_GLOBAL_CHECKPOINT_FIELD); + } + + private long processedGlobalCheckpoint; + + public Status() { + } + + public Status(StreamInput in) throws IOException { + processedGlobalCheckpoint = in.readZLong(); + } + + public long getProcessedGlobalCheckpoint() { + return processedGlobalCheckpoint; + } + + public void setProcessedGlobalCheckpoint(long processedGlobalCheckpoint) { + this.processedGlobalCheckpoint = processedGlobalCheckpoint; + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeZLong(processedGlobalCheckpoint); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(PROCESSED_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), processedGlobalCheckpoint); + return builder.endObject(); + } + + public static Status fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return processedGlobalCheckpoint == status.processedGlobalCheckpoint; + } + + @Override + public int hashCode() { + return Objects.hash(processedGlobalCheckpoint); + } + + public String toString() { + return Strings.toString(this); + } + } +} diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java new file mode 100644 index 00000000000..faba0287714 --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -0,0 +1,222 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.persistent.AllocatedPersistentTask; +import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; +import org.elasticsearch.xpack.security.InternalClient; + +import java.util.Arrays; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public class ShardFollowTasksExecutor extends PersistentTasksExecutor { + + // TODO: turn into cluster wide settings: + private static final long BATCH_SIZE = 256; + private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500); + + private final InternalClient client; + private final ThreadPool threadPool; + + public ShardFollowTasksExecutor(Settings settings, InternalClient client, ThreadPool threadPool) { + super(settings, ShardFollowTask.NAME, Ccr.CCR_THREAD_POOL_NAME); + this.client = client; + this.threadPool = threadPool; + } + + @Override + public void validate(ShardFollowTask params, ClusterState clusterState) { + IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getLeaderShardId().getIndex()); + if (routingTable.shard(params.getLeaderShardId().id()).primaryShard().started() == false) { + throw new IllegalArgumentException("Not all copies of leader shard are started"); + } + + routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex()); + if (routingTable.shard(params.getFollowShardId().id()).primaryShard().started() == false) { + throw new IllegalArgumentException("Not all copies of follow shard are started"); + } + } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask params, Task.Status status) { + final ShardId shardId = params.getLeaderShardId(); + final ShardFollowTask.Status shardFollowStatus = (ShardFollowTask.Status) status; + final long followGlobalCheckPoint; + if (shardFollowStatus != null) { + followGlobalCheckPoint = shardFollowStatus.getProcessedGlobalCheckpoint(); + } else { + followGlobalCheckPoint = SequenceNumbers.NO_OPS_PERFORMED; + } + prepare(task, shardId, followGlobalCheckPoint); + } + + private void prepare(AllocatedPersistentTask task, ShardId leaderShard, long followGlobalCheckPoint) { + if (task.getState() != AllocatedPersistentTask.State.STARTED) { + // TODO: need better cancellation control + return; + } + + client.admin().indices().stats(new IndicesStatsRequest().indices(leaderShard.getIndexName()), ActionListener.wrap(r -> { + Optional leaderShardStats = Arrays.stream(r.getIndex(leaderShard.getIndexName()).getShards()) + .filter(shardStats -> shardStats.getShardRouting().shardId().equals(leaderShard)) + .filter(shardStats -> shardStats.getShardRouting().primary()) + .findAny(); + + if (leaderShardStats.isPresent()) { + final long leaderGlobalCheckPoint = leaderShardStats.get().getSeqNoStats().getGlobalCheckpoint(); + // TODO: check if both indices have the same history uuid + + if (leaderGlobalCheckPoint == followGlobalCheckPoint) { + retry(task, leaderShard, followGlobalCheckPoint); + } else { + assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + leaderGlobalCheckPoint + + "] is not below leaderGlobalCheckPoint [" + followGlobalCheckPoint + "]"; + ChunksCoordinator coordinator = new ChunksCoordinator(leaderShard, e -> { + if (e == null) { + ShardFollowTask.Status newStatus = new ShardFollowTask.Status(); + newStatus.setProcessedGlobalCheckpoint(leaderGlobalCheckPoint); + task.updatePersistentStatus(newStatus, ActionListener.wrap( + persistentTask -> prepare(task, leaderShard, leaderGlobalCheckPoint), task::markAsFailed) + ); + } else { + task.markAsFailed(e); + } + }); + coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint); + coordinator.processChuck(); + } + } else { + task.markAsFailed(new IllegalArgumentException("Cannot find shard stats for primary leader shard")); + } + }, task::markAsFailed)); + } + + private void retry(AllocatedPersistentTask task, ShardId leaderShard, long followGlobalCheckPoint) { + threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + task.markAsFailed(e); + } + + @Override + protected void doRun() throws Exception { + prepare(task, leaderShard, followGlobalCheckPoint); + } + }); + } + + class ChunksCoordinator { + + private final ShardId leaderShard; + private final Consumer handler; + private final Queue chunks = new ConcurrentLinkedQueue<>(); + + ChunksCoordinator(ShardId leaderShard, Consumer handler) { + this.leaderShard = leaderShard; + this.handler = handler; + } + + void createChucks(long from, long to) { + for (long i = from; i <= to; i += BATCH_SIZE) { + long v2 = i + BATCH_SIZE < to ? i + BATCH_SIZE : to; + chunks.add(new long[]{i, v2}); + } + } + + void processChuck() { + long[] chuck = chunks.poll(); + if (chuck == null) { + handler.accept(null); + return; + } + ChunkProcessor processor = new ChunkProcessor(leaderShard, (success, e) -> { + if (success) { + processChuck(); + } else { + handler.accept(e); + } + }); + processor.start(chuck[0], chuck[1]); + } + + } + + class ChunkProcessor { + + private final ShardId shardId; + private final BiConsumer handler; + + ChunkProcessor(ShardId shardId, BiConsumer handler) { + this.shardId = shardId; + this.handler = handler; + } + + void start(long from, long to) { + ShardChangesAction.Request request = new ShardChangesAction.Request(shardId); + request.setMinSeqNo(from); + request.setMaxSeqNo(to); + client.execute(ShardChangesAction.INSTANCE, request, new ActionListener() { + @Override + public void onResponse(ShardChangesAction.Response response) { + handleResponse(response); + } + + @Override + public void onFailure(Exception e) { + handler.accept(false, e); + } + }); + } + + void handleResponse(ShardChangesAction.Response response) { + threadPool.executor(Ccr.CCR_THREAD_POOL_NAME).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + handler.accept(false, e); + } + + @Override + protected void doRun() throws Exception { + // TODO: Wait for api to be available that accepts raw translog operations and log translog operations for now: + for (Translog.Operation operation : response.getOperations()) { + if (operation instanceof Translog.Index) { + Translog.Index indexOp = (Translog.Index) operation; + logger.debug("index op [{}], [{}]", indexOp.seqNo(), indexOp.id()); + } else if (operation instanceof Translog.Delete) { + Engine.Delete deleteOp = (Engine.Delete) operation; + logger.debug("delete op [{}], [{}]", deleteOp.seqNo(), deleteOp.id()); + } else if (operation instanceof Translog.NoOp) { + Engine.NoOp noOp = (Engine.NoOp) operation; + logger.debug("no op [{}], [{}]", noOp.seqNo(), noOp.reason()); + } + } + handler.accept(true, null); + } + }); + } + + + } +} diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/UnfollowIndexAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/UnfollowIndexAction.java new file mode 100644 index 00000000000..af21c8a66d7 --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/UnfollowIndexAction.java @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.persistent.PersistentTasksService; +import org.elasticsearch.xpack.security.InternalClient; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; + +public class UnfollowIndexAction extends Action { + + public static final UnfollowIndexAction INSTANCE = new UnfollowIndexAction(); + public static final String NAME = "cluster:admin/xpack/ccr/unfollow_index"; + + private UnfollowIndexAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends ActionRequest { + + private String followIndex; + + public String getFollowIndex() { + return followIndex; + } + + public void setFollowIndex(String followIndex) { + this.followIndex = followIndex; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + followIndex = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(followIndex); + } + } + + public static class Response extends ActionResponse { + + } + + public static class RequestBuilder extends ActionRequestBuilder { + + public RequestBuilder(ElasticsearchClient client) { + super(client, INSTANCE, new Request()); + } + } + + + public static class TransportAction extends HandledTransportAction { + + private final InternalClient client; + private final PersistentTasksService persistentTasksService; + + @Inject + public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, InternalClient client, + PersistentTasksService persistentTasksService) { + super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); + this.client = client; + this.persistentTasksService = persistentTasksService; + } + + @Override + protected void doExecute(Request request, ActionListener listener) { + client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> { + IndexMetaData followIndexMetadata = r.getState().getMetaData().index(request.followIndex); + final int numShards = followIndexMetadata.getNumberOfShards(); + final AtomicInteger counter = new AtomicInteger(numShards); + final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); + for (int i = 0; i < numShards; i++) { + final int shardId = i; + String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; + persistentTasksService.cancelPersistentTask(taskId, + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + responses.set(shardId, task); + finalizeResponse(); + } + + @Override + public void onFailure(Exception e) { + responses.set(shardId, e); + finalizeResponse(); + } + + void finalizeResponse() { + Exception error = null; + if (counter.decrementAndGet() == 0) { + for (int j = 0; j < responses.length(); j++) { + Object response = responses.get(j); + if (response instanceof Exception) { + if (error == null) { + error = (Exception) response; + } else { + error.addSuppressed((Throwable) response); + } + } + } + + if (error == null) { + // include task ids? + listener.onResponse(new Response()); + } else { + // TODO: cancel all started tasks + listener.onFailure(error); + } + } + } + }); + } + }, listener::onFailure)); + } + } + +} diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowExistingIndexAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowExistingIndexAction.java new file mode 100644 index 00000000000..fa53c84a8a2 --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowExistingIndexAction.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestBuilderListener; + +import java.io.IOException; + +import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.INSTANCE; +import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Request; +import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Response; + +// TODO: change to confirm with API design +public class RestFollowExistingIndexAction extends BaseRestHandler { + + public RestFollowExistingIndexAction(Settings settings, RestController controller) { + super(settings); + // TODO: figure out why: '/{follow_index}/_xpack/ccr/_follow' path clashes with create index api. + controller.registerHandler(RestRequest.Method.POST, "/_xpack/ccr/{follow_index}/_follow", this); + } + + @Override + public String getName() { + return "xpack_ccr_follow_index_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Request request = new Request(); + request.setLeaderIndex(restRequest.param("leader_index")); + request.setFollowIndex(restRequest.param("follow_index")); + return channel -> client.execute(INSTANCE, request, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception { + return new BytesRestResponse(RestStatus.OK, builder.startObject() + .endObject()); + + } + }); + } +} \ No newline at end of file diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowIndexAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowIndexAction.java new file mode 100644 index 00000000000..1233097c46e --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowIndexAction.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestBuilderListener; + +import java.io.IOException; + +import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.INSTANCE; +import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.Request; +import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.Response; + +// TODO: change to confirm with API design +public class RestUnfollowIndexAction extends BaseRestHandler { + + public RestUnfollowIndexAction(Settings settings, RestController controller) { + super(settings); + // TODO: figure out why: '/{follow_index}/_xpack/ccr/_unfollow' path clashes with create index api. + controller.registerHandler(RestRequest.Method.POST, "/_xpack/ccr/{follow_index}/_unfollow", this); + } + + @Override + public String getName() { + return "xpack_ccr_unfollow_index_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Request request = new Request(); + request.setFollowIndex(restRequest.param("follow_index")); + return channel -> client.execute(INSTANCE, request, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception { + return new BytesRestResponse(RestStatus.OK, builder.startObject() + .endObject()); + + } + }); + } +} diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 7d7937007a8..72a92b69d18 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -25,20 +25,30 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackSettings; +import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; +import org.elasticsearch.xpack.ccr.action.ShardFollowTask; +import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, transportClientRatio = 0) public class ShardChangesIT extends ESIntegTestCase { @@ -133,4 +143,84 @@ public class ShardChangesIT extends ESIntegTestCase { assertThat(operation.id(), equalTo("5")); } + + public void testFollowIndex() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + + assertAcked(client().admin().indices().prepareCreate("index1") + .setSettings(Settings.builder().put("index.number_of_shards", numberOfPrimaryShards))); + assertAcked(client().admin().indices().prepareCreate("index2") + .setSettings(Settings.builder().put("index.number_of_shards", numberOfPrimaryShards))); + ensureGreen("index1", "index2"); + + FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request(); + followRequest.setLeaderIndex("index1"); + followRequest.setFollowIndex("index2"); + client().execute(FollowExistingIndexAction.INSTANCE, followRequest).get(); + + int numDocs = randomIntBetween(2, 64); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); + } + + final Map numDocsPerShard = new HashMap<>(); + ShardStats[] shardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (ShardStats shardStat : shardStats) { + if (shardStat.getShardRouting().primary()) { + long value = shardStat.getStats().getIndexing().getTotal().getIndexCount() - 1; + numDocsPerShard.put(shardStat.getShardRouting().shardId(), value); + } + } + + assertBusy(() -> { + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards)); + + for (PersistentTasksCustomMetaData.PersistentTask task : tasks.tasks()) { + ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams(); + ShardFollowTask.Status status = (ShardFollowTask.Status) task.getStatus(); + assertThat(status, notNullValue()); + assertThat(status.getProcessedGlobalCheckpoint(), equalTo(numDocsPerShard.get(shardFollowTask.getLeaderShardId()))); + } + }); + + numDocs = randomIntBetween(2, 64); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); + } + + numDocsPerShard.clear(); + shardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (ShardStats shardStat : shardStats) { + if (shardStat.getShardRouting().primary()) { + long value = shardStat.getStats().getIndexing().getTotal().getIndexCount() - 1; + numDocsPerShard.put(shardStat.getShardRouting().shardId(), value); + } + } + + assertBusy(() -> { + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards)); + + for (PersistentTasksCustomMetaData.PersistentTask task : tasks.tasks()) { + ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams(); + ShardFollowTask.Status status = (ShardFollowTask.Status) task.getStatus(); + assertThat(status, notNullValue()); + assertThat(status.getProcessedGlobalCheckpoint(), equalTo(numDocsPerShard.get(shardFollowTask.getLeaderShardId()))); + } + }); + + UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); + unfollowRequest.setFollowIndex("index2"); + client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); + + assertBusy(() -> { + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasks.tasks().size(), equalTo(0)); + }); + } + } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskStatusTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskStatusTests.java new file mode 100644 index 00000000000..7013c8058f9 --- /dev/null +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskStatusTests.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class ShardFollowTaskStatusTests extends AbstractSerializingTestCase { + + @Override + protected ShardFollowTask.Status doParseInstance(XContentParser parser) throws IOException { + return ShardFollowTask.Status.fromXContent(parser); + } + + @Override + protected ShardFollowTask.Status createTestInstance() { + ShardFollowTask.Status status = new ShardFollowTask.Status(); + status.setProcessedGlobalCheckpoint(randomNonNegativeLong()); + return status; + } + + @Override + protected Writeable.Reader instanceReader() { + return ShardFollowTask.Status::new; + } +} diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java new file mode 100644 index 00000000000..84f83fc5660 --- /dev/null +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class ShardFollowTaskTests extends AbstractSerializingTestCase { + + @Override + protected ShardFollowTask doParseInstance(XContentParser parser) throws IOException { + return ShardFollowTask.fromXContent(parser); + } + + @Override + protected ShardFollowTask createTestInstance() { + return new ShardFollowTask( + new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), + new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)) + ); + } + + @Override + protected Writeable.Reader instanceReader() { + return ShardFollowTask::new; + } +}