ccr: Added apis and persistent tasks for following an index and log changes instead of indexing.

This commit is contained in:
Martijn van Groningen 2017-10-23 14:51:29 +02:00
parent 8e0b34b507
commit e6ad1c3e9d
10 changed files with 1107 additions and 2 deletions

View File

@ -7,15 +7,35 @@ package org.elasticsearch.xpack.ccr;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.plugins.ActionPlugin.ActionHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import org.elasticsearch.xpack.ccr.rest.RestFollowExistingIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@ -29,7 +49,10 @@ import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTIN
*/
public final class Ccr {
public static final String CCR_THREAD_POOL_NAME = "ccr";
private final boolean enabled;
private final Settings settings;
private final boolean tribeNode;
private final boolean tribeNodeClient;
@ -39,17 +62,57 @@ public final class Ccr {
* @param settings the settings
*/
public Ccr(final Settings settings) {
this.settings = settings;
this.enabled = CCR_ENABLED_SETTING.get(settings);
this.tribeNode = XPackPlugin.isTribeNode(settings);
this.tribeNodeClient = XPackPlugin.isTribeClientNode(settings);
}
public List<PersistentTasksExecutor<?>> createPersistentTasksExecutors(InternalClient internalClient, ThreadPool threadPool) {
return Collections.singletonList(new ShardFollowTasksExecutor(settings, internalClient, threadPool));
}
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
if (!enabled || tribeNodeClient || tribeNode) {
if (enabled == false || tribeNodeClient || tribeNode) {
return emptyList();
}
return Collections.singletonList(new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class));
return Arrays.asList(
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
new ActionHandler<>(FollowExistingIndexAction.INSTANCE, FollowExistingIndexAction.TransportAction.class),
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class)
);
}
public List<RestHandler> getRestHandlers(Settings settings, RestController restController) {
return Arrays.asList(
new RestUnfollowIndexAction(settings, restController),
new RestFollowExistingIndexAction(settings, restController)
);
}
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(
// Persistent action requests
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, ShardFollowTask.NAME,
ShardFollowTask::new),
// Task statuses
new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowTask.NAME,
ShardFollowTask.Status::new)
);
}
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return Arrays.asList(
// Persistent action requests
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(ShardFollowTask.NAME),
ShardFollowTask::fromXContent),
// Task statuses
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(ShardFollowTask.NAME),
ShardFollowTask.Status::fromXContent)
);
}
/**
@ -75,4 +138,15 @@ public final class Ccr {
}
}
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
if (enabled == false || tribeNode || tribeNodeClient) {
return Collections.emptyList();
}
FixedExecutorBuilder ccrTp = new FixedExecutorBuilder(settings, CCR_THREAD_POOL_NAME,
32, 100, "xpack.ccr.ccr_thread_pool");
return Collections.singletonList(ccrTp);
}
}

View File

@ -0,0 +1,207 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.Request,
FollowExistingIndexAction.Response, FollowExistingIndexAction.RequestBuilder> {
public static final FollowExistingIndexAction INSTANCE = new FollowExistingIndexAction();
public static final String NAME = "cluster:admin/xpack/ccr/follow_existing_index";
private FollowExistingIndexAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends ActionRequest {
private String leaderIndex;
private String followIndex;
public String getLeaderIndex() {
return leaderIndex;
}
public void setLeaderIndex(String leaderIndex) {
this.leaderIndex = leaderIndex;
}
public String getFollowIndex() {
return followIndex;
}
public void setFollowIndex(String followIndex) {
this.followIndex = followIndex;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
leaderIndex = in.readString();
followIndex = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(leaderIndex);
out.writeString(followIndex);
}
}
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, FollowExistingIndexAction.RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
super(client, action, new Request());
}
}
public static class Response extends ActionResponse {
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final InternalClient client;
private final PersistentTasksService persistentTasksService;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, InternalClient client,
PersistentTasksService persistentTasksService) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.client = client;
this.persistentTasksService = persistentTasksService;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
client.admin().cluster().state(new ClusterStateRequest(), new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
IndexMetaData leaderIndexMetadata = clusterStateResponse.getState().getMetaData()
.index(request.leaderIndex);
if (leaderIndexMetadata == null) {
listener.onFailure(new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"));
}
IndexMetaData followIndexMetadata = clusterStateResponse.getState().getMetaData()
.index(request.followIndex);
if (followIndexMetadata == null) {
listener.onFailure(new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist"));
}
if (leaderIndexMetadata.getNumberOfShards() != followIndexMetadata.getNumberOfShards()) {
listener.onFailure(new IllegalArgumentException("leader index primary shards [" +
leaderIndexMetadata.getNumberOfShards() + "] does not match with the number of " +
"shards of the follow index [" + followIndexMetadata.getNumberOfShards() + "]"));
return;
}
// TODO: other validation checks
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId));
persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
responses.set(shardId, task);
finalizeResponse();
}
@Override
public void onFailure(Exception e) {
responses.set(shardId, e);
finalizeResponse();
}
void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);
}
}
}
if (error == null) {
// include task ids?
listener.onResponse(new Response());
} else {
// TODO: cancel all started tasks
listener.onFailure(error);
}
}
}
});
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
}

View File

@ -0,0 +1,178 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import java.io.IOException;
import java.util.Objects;
public class ShardFollowTask implements PersistentTaskParams {
public static final String NAME = "shard_follow";
static final ParseField FOLLOW_SHARD_INDEX_FIELD = new ParseField("follow_shard_index");
static final ParseField FOLLOW_SHARD_INDEX_UUID_FIELD = new ParseField("follow_shard_index_uuid");
static final ParseField FOLLOW_SHARD_SHARDID_FIELD = new ParseField("follow_shard_shard");
static final ParseField LEADER_SHARD_INDEX_FIELD = new ParseField("leader_shard_index");
static final ParseField LEADER_SHARD_INDEX_UUID_FIELD = new ParseField("leader_shard_index_uuid");
static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard");
public static ConstructingObjectParser<ShardFollowTask, Void> PARSER = new ConstructingObjectParser<>(NAME,
(a) -> new ShardFollowTask(new ShardId((String) a[0], (String) a[1], (int) a[2]),
new ShardId((String) a[3], (String) a[4], (int) a[5])));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_UUID_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_SHARDID_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_UUID_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), LEADER_SHARD_SHARDID_FIELD);
}
public ShardFollowTask(ShardId followShardId, ShardId leaderShardId) {
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
}
public ShardFollowTask(StreamInput in) throws IOException {
this.followShardId = ShardId.readShardId(in);
this.leaderShardId = ShardId.readShardId(in);
}
public static ShardFollowTask fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
private final ShardId followShardId;
private final ShardId leaderShardId;
public ShardId getFollowShardId() {
return followShardId;
}
public ShardId getLeaderShardId() {
return leaderShardId;
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
followShardId.writeTo(out);
leaderShardId.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FOLLOW_SHARD_INDEX_FIELD.getPreferredName(), followShardId.getIndex().getName());
builder.field(FOLLOW_SHARD_INDEX_UUID_FIELD.getPreferredName(), followShardId.getIndex().getUUID());
builder.field(FOLLOW_SHARD_SHARDID_FIELD.getPreferredName(), followShardId.id());
builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName());
builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID());
builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id());
return builder.endObject();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardFollowTask that = (ShardFollowTask) o;
return Objects.equals(followShardId, that.followShardId) &&
Objects.equals(leaderShardId, that.leaderShardId);
}
@Override
public int hashCode() {
return Objects.hash(followShardId, leaderShardId);
}
public String toString() {
return Strings.toString(this);
}
public static class Status implements Task.Status {
static final ParseField PROCESSED_GLOBAL_CHECKPOINT_FIELD = new ParseField("processed_global_checkpoint");
static final ObjectParser<Status, Void> PARSER = new ObjectParser<>(NAME, Status::new);
static {
PARSER.declareLong(Status::setProcessedGlobalCheckpoint, PROCESSED_GLOBAL_CHECKPOINT_FIELD);
}
private long processedGlobalCheckpoint;
public Status() {
}
public Status(StreamInput in) throws IOException {
processedGlobalCheckpoint = in.readZLong();
}
public long getProcessedGlobalCheckpoint() {
return processedGlobalCheckpoint;
}
public void setProcessedGlobalCheckpoint(long processedGlobalCheckpoint) {
this.processedGlobalCheckpoint = processedGlobalCheckpoint;
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeZLong(processedGlobalCheckpoint);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(PROCESSED_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), processedGlobalCheckpoint);
return builder.endObject();
}
public static Status fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
return processedGlobalCheckpoint == status.processedGlobalCheckpoint;
}
@Override
public int hashCode() {
return Objects.hash(processedGlobalCheckpoint);
}
public String toString() {
return Strings.toString(this);
}
}
}

View File

@ -0,0 +1,222 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.persistent.AllocatedPersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.Arrays;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollowTask> {
// TODO: turn into cluster wide settings:
private static final long BATCH_SIZE = 256;
private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500);
private final InternalClient client;
private final ThreadPool threadPool;
public ShardFollowTasksExecutor(Settings settings, InternalClient client, ThreadPool threadPool) {
super(settings, ShardFollowTask.NAME, Ccr.CCR_THREAD_POOL_NAME);
this.client = client;
this.threadPool = threadPool;
}
@Override
public void validate(ShardFollowTask params, ClusterState clusterState) {
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getLeaderShardId().getIndex());
if (routingTable.shard(params.getLeaderShardId().id()).primaryShard().started() == false) {
throw new IllegalArgumentException("Not all copies of leader shard are started");
}
routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex());
if (routingTable.shard(params.getFollowShardId().id()).primaryShard().started() == false) {
throw new IllegalArgumentException("Not all copies of follow shard are started");
}
}
@Override
protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask params, Task.Status status) {
final ShardId shardId = params.getLeaderShardId();
final ShardFollowTask.Status shardFollowStatus = (ShardFollowTask.Status) status;
final long followGlobalCheckPoint;
if (shardFollowStatus != null) {
followGlobalCheckPoint = shardFollowStatus.getProcessedGlobalCheckpoint();
} else {
followGlobalCheckPoint = SequenceNumbers.NO_OPS_PERFORMED;
}
prepare(task, shardId, followGlobalCheckPoint);
}
private void prepare(AllocatedPersistentTask task, ShardId leaderShard, long followGlobalCheckPoint) {
if (task.getState() != AllocatedPersistentTask.State.STARTED) {
// TODO: need better cancellation control
return;
}
client.admin().indices().stats(new IndicesStatsRequest().indices(leaderShard.getIndexName()), ActionListener.wrap(r -> {
Optional<ShardStats> leaderShardStats = Arrays.stream(r.getIndex(leaderShard.getIndexName()).getShards())
.filter(shardStats -> shardStats.getShardRouting().shardId().equals(leaderShard))
.filter(shardStats -> shardStats.getShardRouting().primary())
.findAny();
if (leaderShardStats.isPresent()) {
final long leaderGlobalCheckPoint = leaderShardStats.get().getSeqNoStats().getGlobalCheckpoint();
// TODO: check if both indices have the same history uuid
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
retry(task, leaderShard, followGlobalCheckPoint);
} else {
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + leaderGlobalCheckPoint +
"] is not below leaderGlobalCheckPoint [" + followGlobalCheckPoint + "]";
ChunksCoordinator coordinator = new ChunksCoordinator(leaderShard, e -> {
if (e == null) {
ShardFollowTask.Status newStatus = new ShardFollowTask.Status();
newStatus.setProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
task.updatePersistentStatus(newStatus, ActionListener.wrap(
persistentTask -> prepare(task, leaderShard, leaderGlobalCheckPoint), task::markAsFailed)
);
} else {
task.markAsFailed(e);
}
});
coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint);
coordinator.processChuck();
}
} else {
task.markAsFailed(new IllegalArgumentException("Cannot find shard stats for primary leader shard"));
}
}, task::markAsFailed));
}
private void retry(AllocatedPersistentTask task, ShardId leaderShard, long followGlobalCheckPoint) {
threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
task.markAsFailed(e);
}
@Override
protected void doRun() throws Exception {
prepare(task, leaderShard, followGlobalCheckPoint);
}
});
}
class ChunksCoordinator {
private final ShardId leaderShard;
private final Consumer<Exception> handler;
private final Queue<long[]> chunks = new ConcurrentLinkedQueue<>();
ChunksCoordinator(ShardId leaderShard, Consumer<Exception> handler) {
this.leaderShard = leaderShard;
this.handler = handler;
}
void createChucks(long from, long to) {
for (long i = from; i <= to; i += BATCH_SIZE) {
long v2 = i + BATCH_SIZE < to ? i + BATCH_SIZE : to;
chunks.add(new long[]{i, v2});
}
}
void processChuck() {
long[] chuck = chunks.poll();
if (chuck == null) {
handler.accept(null);
return;
}
ChunkProcessor processor = new ChunkProcessor(leaderShard, (success, e) -> {
if (success) {
processChuck();
} else {
handler.accept(e);
}
});
processor.start(chuck[0], chuck[1]);
}
}
class ChunkProcessor {
private final ShardId shardId;
private final BiConsumer<Boolean, Exception> handler;
ChunkProcessor(ShardId shardId, BiConsumer<Boolean, Exception> handler) {
this.shardId = shardId;
this.handler = handler;
}
void start(long from, long to) {
ShardChangesAction.Request request = new ShardChangesAction.Request(shardId);
request.setMinSeqNo(from);
request.setMaxSeqNo(to);
client.execute(ShardChangesAction.INSTANCE, request, new ActionListener<ShardChangesAction.Response>() {
@Override
public void onResponse(ShardChangesAction.Response response) {
handleResponse(response);
}
@Override
public void onFailure(Exception e) {
handler.accept(false, e);
}
});
}
void handleResponse(ShardChangesAction.Response response) {
threadPool.executor(Ccr.CCR_THREAD_POOL_NAME).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
handler.accept(false, e);
}
@Override
protected void doRun() throws Exception {
// TODO: Wait for api to be available that accepts raw translog operations and log translog operations for now:
for (Translog.Operation operation : response.getOperations()) {
if (operation instanceof Translog.Index) {
Translog.Index indexOp = (Translog.Index) operation;
logger.debug("index op [{}], [{}]", indexOp.seqNo(), indexOp.id());
} else if (operation instanceof Translog.Delete) {
Engine.Delete deleteOp = (Engine.Delete) operation;
logger.debug("delete op [{}], [{}]", deleteOp.seqNo(), deleteOp.id());
} else if (operation instanceof Translog.NoOp) {
Engine.NoOp noOp = (Engine.NoOp) operation;
logger.debug("no op [{}], [{}]", noOp.seqNo(), noOp.reason());
}
}
handler.accept(true, null);
}
});
}
}
}

View File

@ -0,0 +1,163 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
public class UnfollowIndexAction extends Action<UnfollowIndexAction.Request, UnfollowIndexAction.Response,
UnfollowIndexAction.RequestBuilder> {
public static final UnfollowIndexAction INSTANCE = new UnfollowIndexAction();
public static final String NAME = "cluster:admin/xpack/ccr/unfollow_index";
private UnfollowIndexAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends ActionRequest {
private String followIndex;
public String getFollowIndex() {
return followIndex;
}
public void setFollowIndex(String followIndex) {
this.followIndex = followIndex;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
followIndex = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(followIndex);
}
}
public static class Response extends ActionResponse {
}
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new Request());
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final InternalClient client;
private final PersistentTasksService persistentTasksService;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, InternalClient client,
PersistentTasksService persistentTasksService) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.client = client;
this.persistentTasksService = persistentTasksService;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> {
IndexMetaData followIndexMetadata = r.getState().getMetaData().index(request.followIndex);
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
persistentTasksService.cancelPersistentTask(taskId,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
responses.set(shardId, task);
finalizeResponse();
}
@Override
public void onFailure(Exception e) {
responses.set(shardId, e);
finalizeResponse();
}
void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);
}
}
}
if (error == null) {
// include task ids?
listener.onResponse(new Response());
} else {
// TODO: cancel all started tasks
listener.onFailure(error);
}
}
}
});
}
}, listener::onFailure));
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import java.io.IOException;
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.INSTANCE;
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Request;
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Response;
// TODO: change to confirm with API design
public class RestFollowExistingIndexAction extends BaseRestHandler {
public RestFollowExistingIndexAction(Settings settings, RestController controller) {
super(settings);
// TODO: figure out why: '/{follow_index}/_xpack/ccr/_follow' path clashes with create index api.
controller.registerHandler(RestRequest.Method.POST, "/_xpack/ccr/{follow_index}/_follow", this);
}
@Override
public String getName() {
return "xpack_ccr_follow_index_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Request request = new Request();
request.setLeaderIndex(restRequest.param("leader_index"));
request.setFollowIndex(restRequest.param("follow_index"));
return channel -> client.execute(INSTANCE, request, new RestBuilderListener<Response>(channel) {
@Override
public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {
return new BytesRestResponse(RestStatus.OK, builder.startObject()
.endObject());
}
});
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import java.io.IOException;
import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.INSTANCE;
import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.Request;
import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.Response;
// TODO: change to confirm with API design
public class RestUnfollowIndexAction extends BaseRestHandler {
public RestUnfollowIndexAction(Settings settings, RestController controller) {
super(settings);
// TODO: figure out why: '/{follow_index}/_xpack/ccr/_unfollow' path clashes with create index api.
controller.registerHandler(RestRequest.Method.POST, "/_xpack/ccr/{follow_index}/_unfollow", this);
}
@Override
public String getName() {
return "xpack_ccr_unfollow_index_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Request request = new Request();
request.setFollowIndex(restRequest.param("follow_index"));
return channel -> client.execute(INSTANCE, request, new RestBuilderListener<Response>(channel) {
@Override
public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {
return new BytesRestResponse(RestStatus.OK, builder.startObject()
.endObject());
}
});
}
}

View File

@ -25,20 +25,30 @@ package org.elasticsearch.xpack.ccr;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, transportClientRatio = 0)
public class ShardChangesIT extends ESIntegTestCase {
@ -133,4 +143,84 @@ public class ShardChangesIT extends ESIntegTestCase {
assertThat(operation.id(), equalTo("5"));
}
public void testFollowIndex() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
assertAcked(client().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder().put("index.number_of_shards", numberOfPrimaryShards)));
assertAcked(client().admin().indices().prepareCreate("index2")
.setSettings(Settings.builder().put("index.number_of_shards", numberOfPrimaryShards)));
ensureGreen("index1", "index2");
FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request();
followRequest.setLeaderIndex("index1");
followRequest.setFollowIndex("index2");
client().execute(FollowExistingIndexAction.INSTANCE, followRequest).get();
int numDocs = randomIntBetween(2, 64);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
}
final Map<ShardId, Long> numDocsPerShard = new HashMap<>();
ShardStats[] shardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
for (ShardStats shardStat : shardStats) {
if (shardStat.getShardRouting().primary()) {
long value = shardStat.getStats().getIndexing().getTotal().getIndexCount() - 1;
numDocsPerShard.put(shardStat.getShardRouting().shardId(), value);
}
}
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards));
for (PersistentTasksCustomMetaData.PersistentTask<?> task : tasks.tasks()) {
ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams();
ShardFollowTask.Status status = (ShardFollowTask.Status) task.getStatus();
assertThat(status, notNullValue());
assertThat(status.getProcessedGlobalCheckpoint(), equalTo(numDocsPerShard.get(shardFollowTask.getLeaderShardId())));
}
});
numDocs = randomIntBetween(2, 64);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
}
numDocsPerShard.clear();
shardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
for (ShardStats shardStat : shardStats) {
if (shardStat.getShardRouting().primary()) {
long value = shardStat.getStats().getIndexing().getTotal().getIndexCount() - 1;
numDocsPerShard.put(shardStat.getShardRouting().shardId(), value);
}
}
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards));
for (PersistentTasksCustomMetaData.PersistentTask<?> task : tasks.tasks()) {
ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams();
ShardFollowTask.Status status = (ShardFollowTask.Status) task.getStatus();
assertThat(status, notNullValue());
assertThat(status.getProcessedGlobalCheckpoint(), equalTo(numDocsPerShard.get(shardFollowTask.getLeaderShardId())));
}
});
UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
unfollowRequest.setFollowIndex("index2");
client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get();
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(0));
});
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
public class ShardFollowTaskStatusTests extends AbstractSerializingTestCase<ShardFollowTask.Status> {
@Override
protected ShardFollowTask.Status doParseInstance(XContentParser parser) throws IOException {
return ShardFollowTask.Status.fromXContent(parser);
}
@Override
protected ShardFollowTask.Status createTestInstance() {
ShardFollowTask.Status status = new ShardFollowTask.Status();
status.setProcessedGlobalCheckpoint(randomNonNegativeLong());
return status;
}
@Override
protected Writeable.Reader<ShardFollowTask.Status> instanceReader() {
return ShardFollowTask.Status::new;
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
public class ShardFollowTaskTests extends AbstractSerializingTestCase<ShardFollowTask> {
@Override
protected ShardFollowTask doParseInstance(XContentParser parser) throws IOException {
return ShardFollowTask.fromXContent(parser);
}
@Override
protected ShardFollowTask createTestInstance() {
return new ShardFollowTask(
new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)),
new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5))
);
}
@Override
protected Writeable.Reader<ShardFollowTask> instanceReader() {
return ShardFollowTask::new;
}
}