Implement translog operation bulk action
This commit adds a bulk action for apply translog operations in bulk to an index. This action is then used in the persistent task for CCR to apply shard changes from a leader shard. Relates #3147
This commit is contained in:
parent
11aa83011c
commit
be5f83a6bd
|
@ -28,6 +28,8 @@ import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
|
|||
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
|
||||
import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
|
||||
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestFollowExistingIndexAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
|
||||
|
@ -79,8 +81,8 @@ public final class Ccr {
|
|||
return Arrays.asList(
|
||||
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
|
||||
new ActionHandler<>(FollowExistingIndexAction.INSTANCE, FollowExistingIndexAction.TransportAction.class),
|
||||
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class)
|
||||
);
|
||||
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class),
|
||||
new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class));
|
||||
}
|
||||
|
||||
public List<RestHandler> getRestHandlers(Settings settings, RestController restController) {
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -139,44 +140,44 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
|
|||
}
|
||||
}
|
||||
|
||||
public static class Response extends ActionResponse {
|
||||
public static final class Response extends ActionResponse {
|
||||
|
||||
private List<Translog.Operation> operations;
|
||||
private Translog.Operation[] operations;
|
||||
|
||||
Response() {
|
||||
}
|
||||
|
||||
Response(List<Translog.Operation> operations) {
|
||||
Response(final Translog.Operation[] operations) {
|
||||
this.operations = operations;
|
||||
}
|
||||
|
||||
public List<Translog.Operation> getOperations() {
|
||||
public Translog.Operation[] getOperations() {
|
||||
return operations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
public void readFrom(final StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
operations = Translog.readOperations(in);
|
||||
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
public void writeTo(final StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
Translog.writeOperations(out, operations);
|
||||
out.writeArray(Translog.Operation::writeOperation, operations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
public boolean equals(final Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Response response = (Response) o;
|
||||
return Objects.equals(operations, response.operations);
|
||||
final Response response = (Response) o;
|
||||
return Arrays.equals(operations, response.operations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(operations);
|
||||
return Arrays.hashCode(operations);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -209,7 +210,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
|
|||
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
|
||||
IndexShard indexShard = indexService.getShard(request.getShard().id());
|
||||
|
||||
List<Translog.Operation> operations = getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo);
|
||||
Translog.Operation[] operations = getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo);
|
||||
return new Response(operations);
|
||||
}
|
||||
|
||||
|
@ -233,7 +234,9 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
|
|||
|
||||
}
|
||||
|
||||
static List<Translog.Operation> getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo) throws IOException {
|
||||
private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];
|
||||
|
||||
static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo) throws IOException {
|
||||
if (indexShard.state() != IndexShardState.STARTED) {
|
||||
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
|
||||
}
|
||||
|
@ -250,7 +253,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
|
|||
}
|
||||
|
||||
if (tracker.getCheckpoint() == maxSeqNo) {
|
||||
return operations;
|
||||
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
|
||||
} else {
|
||||
String message = "Not all operations between min_seq_no [" + minSeqNo + "] and max_seq_no [" + maxSeqNo +
|
||||
"] found, tracker checkpoint [" + tracker.getCheckpoint() + "]";
|
||||
|
|
|
@ -14,13 +14,14 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.ccr.Ccr;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
|
||||
import org.elasticsearch.xpack.persistent.AllocatedPersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
|
||||
|
||||
|
@ -28,7 +29,6 @@ import java.util.Arrays;
|
|||
import java.util.Optional;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollowTask> {
|
||||
|
@ -61,7 +61,6 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
|
||||
@Override
|
||||
protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask params, Task.Status status) {
|
||||
final ShardId shardId = params.getLeaderShardId();
|
||||
final ShardFollowTask.Status shardFollowStatus = (ShardFollowTask.Status) status;
|
||||
final long followGlobalCheckPoint;
|
||||
if (shardFollowStatus != null) {
|
||||
|
@ -69,10 +68,10 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
} else {
|
||||
followGlobalCheckPoint = SequenceNumbers.NO_OPS_PERFORMED;
|
||||
}
|
||||
prepare(task, shardId, followGlobalCheckPoint);
|
||||
prepare(task, params.getLeaderShardId(), params.getFollowShardId(), followGlobalCheckPoint);
|
||||
}
|
||||
|
||||
private void prepare(AllocatedPersistentTask task, ShardId leaderShard, long followGlobalCheckPoint) {
|
||||
void prepare(AllocatedPersistentTask task, ShardId leaderShard, ShardId followerShard, long followGlobalCheckPoint) {
|
||||
if (task.getState() != AllocatedPersistentTask.State.STARTED) {
|
||||
// TODO: need better cancellation control
|
||||
return;
|
||||
|
@ -87,18 +86,17 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
if (leaderShardStats.isPresent()) {
|
||||
final long leaderGlobalCheckPoint = leaderShardStats.get().getSeqNoStats().getGlobalCheckpoint();
|
||||
// TODO: check if both indices have the same history uuid
|
||||
|
||||
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
|
||||
retry(task, leaderShard, followGlobalCheckPoint);
|
||||
retry(task, leaderShard, followerShard, followGlobalCheckPoint);
|
||||
} else {
|
||||
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + leaderGlobalCheckPoint +
|
||||
"] is not below leaderGlobalCheckPoint [" + followGlobalCheckPoint + "]";
|
||||
ChunksCoordinator coordinator = new ChunksCoordinator(leaderShard, e -> {
|
||||
ChunksCoordinator coordinator = new ChunksCoordinator(leaderShard, followerShard, e -> {
|
||||
if (e == null) {
|
||||
ShardFollowTask.Status newStatus = new ShardFollowTask.Status();
|
||||
newStatus.setProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
|
||||
task.updatePersistentStatus(newStatus, ActionListener.wrap(
|
||||
persistentTask -> prepare(task, leaderShard, leaderGlobalCheckPoint), task::markAsFailed)
|
||||
persistentTask -> prepare(task, leaderShard, followerShard, leaderGlobalCheckPoint), task::markAsFailed)
|
||||
);
|
||||
} else {
|
||||
task.markAsFailed(e);
|
||||
|
@ -113,7 +111,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
}, task::markAsFailed));
|
||||
}
|
||||
|
||||
private void retry(AllocatedPersistentTask task, ShardId leaderShard, long followGlobalCheckPoint) {
|
||||
private void retry(AllocatedPersistentTask task, ShardId leaderShard, ShardId followerShard, long followGlobalCheckPoint) {
|
||||
threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
|
@ -122,7 +120,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
prepare(task, leaderShard, followGlobalCheckPoint);
|
||||
prepare(task, leaderShard, followerShard, followGlobalCheckPoint);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -130,11 +128,13 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
class ChunksCoordinator {
|
||||
|
||||
private final ShardId leaderShard;
|
||||
private final ShardId followerShard;
|
||||
private final Consumer<Exception> handler;
|
||||
private final Queue<long[]> chunks = new ConcurrentLinkedQueue<>();
|
||||
|
||||
ChunksCoordinator(ShardId leaderShard, Consumer<Exception> handler) {
|
||||
ChunksCoordinator(ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
|
||||
this.leaderShard = leaderShard;
|
||||
this.followerShard = followerShard;
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
|
@ -146,35 +146,37 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
}
|
||||
|
||||
void processChuck() {
|
||||
long[] chuck = chunks.poll();
|
||||
if (chuck == null) {
|
||||
long[] chunk = chunks.poll();
|
||||
if (chunk == null) {
|
||||
handler.accept(null);
|
||||
return;
|
||||
}
|
||||
ChunkProcessor processor = new ChunkProcessor(leaderShard, (success, e) -> {
|
||||
if (success) {
|
||||
ChunkProcessor processor = new ChunkProcessor(leaderShard, followerShard, e -> {
|
||||
if (e == null) {
|
||||
processChuck();
|
||||
} else {
|
||||
handler.accept(e);
|
||||
}
|
||||
});
|
||||
processor.start(chuck[0], chuck[1]);
|
||||
processor.start(chunk[0], chunk[1]);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ChunkProcessor {
|
||||
|
||||
private final ShardId shardId;
|
||||
private final BiConsumer<Boolean, Exception> handler;
|
||||
private final ShardId leaderShard;
|
||||
private final ShardId followerShard;
|
||||
private final Consumer<Exception> handler;
|
||||
|
||||
ChunkProcessor(ShardId shardId, BiConsumer<Boolean, Exception> handler) {
|
||||
this.shardId = shardId;
|
||||
ChunkProcessor(ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
|
||||
this.leaderShard = leaderShard;
|
||||
this.followerShard = followerShard;
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
void start(long from, long to) {
|
||||
ShardChangesAction.Request request = new ShardChangesAction.Request(shardId);
|
||||
ShardChangesAction.Request request = new ShardChangesAction.Request(leaderShard);
|
||||
request.setMinSeqNo(from);
|
||||
request.setMaxSeqNo(to);
|
||||
client.execute(ShardChangesAction.INSTANCE, request, new ActionListener<ShardChangesAction.Response>() {
|
||||
|
@ -185,7 +187,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
handler.accept(false, e);
|
||||
assert e != null;
|
||||
handler.accept(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -194,29 +197,29 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
threadPool.executor(Ccr.CCR_THREAD_POOL_NAME).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
handler.accept(false, e);
|
||||
assert e != null;
|
||||
handler.accept(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
// TODO: Wait for api to be available that accepts raw translog operations and log translog operations for now:
|
||||
for (Translog.Operation operation : response.getOperations()) {
|
||||
if (operation instanceof Translog.Index) {
|
||||
Translog.Index indexOp = (Translog.Index) operation;
|
||||
logger.debug("index op [{}], [{}]", indexOp.seqNo(), indexOp.id());
|
||||
} else if (operation instanceof Translog.Delete) {
|
||||
Engine.Delete deleteOp = (Engine.Delete) operation;
|
||||
logger.debug("delete op [{}], [{}]", deleteOp.seqNo(), deleteOp.id());
|
||||
} else if (operation instanceof Translog.NoOp) {
|
||||
Engine.NoOp noOp = (Engine.NoOp) operation;
|
||||
logger.debug("no op [{}], [{}]", noOp.seqNo(), noOp.reason());
|
||||
final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations());
|
||||
client.execute(BulkShardOperationsAction.INSTANCE, request, new ActionListener<BulkShardOperationsResponse>() {
|
||||
@Override
|
||||
public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) {
|
||||
handler.accept(null);
|
||||
}
|
||||
}
|
||||
handler.accept(true, null);
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
assert e != null;
|
||||
handler.accept(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action.bulk;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public class BulkShardOperationsAction
|
||||
extends Action<BulkShardOperationsRequest, BulkShardOperationsResponse, BulkShardOperationsRequestBuilder> {
|
||||
|
||||
public static final BulkShardOperationsAction INSTANCE = new BulkShardOperationsAction();
|
||||
public static final String NAME = "indices:data/write/bulk_shard_operations[s]";
|
||||
|
||||
private BulkShardOperationsAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkShardOperationsRequestBuilder newRequestBuilder(ElasticsearchClient client) {
|
||||
return new BulkShardOperationsRequestBuilder(client);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkShardOperationsResponse newResponse() {
|
||||
return new BulkShardOperationsResponse();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action.bulk;
|
||||
|
||||
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<BulkShardOperationsRequest> {
|
||||
|
||||
private Translog.Operation[] operations;
|
||||
|
||||
public BulkShardOperationsRequest() {
|
||||
|
||||
}
|
||||
|
||||
public BulkShardOperationsRequest(final ShardId shardId, final Translog.Operation[] operations) {
|
||||
super(shardId);
|
||||
setRefreshPolicy(RefreshPolicy.NONE);
|
||||
this.operations = operations;
|
||||
}
|
||||
|
||||
public Translog.Operation[] getOperations() {
|
||||
return operations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(final StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(final StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeArray(Translog.Operation::writeOperation, operations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "BulkShardOperationsRequest{" +
|
||||
"operations=" + operations.length+
|
||||
", shardId=" + shardId +
|
||||
", timeout=" + timeout +
|
||||
", index='" + index + '\'' +
|
||||
", waitForActiveShards=" + waitForActiveShards +
|
||||
'}';
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action.bulk;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public class BulkShardOperationsRequestBuilder
|
||||
extends ActionRequestBuilder<BulkShardOperationsRequest, BulkShardOperationsResponse, BulkShardOperationsRequestBuilder> {
|
||||
|
||||
public BulkShardOperationsRequestBuilder(final ElasticsearchClient client) {
|
||||
super(client, BulkShardOperationsAction.INSTANCE, new BulkShardOperationsRequest());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action.bulk;
|
||||
|
||||
import org.elasticsearch.action.support.WriteResponse;
|
||||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
|
||||
public final class BulkShardOperationsResponse extends ReplicationResponse implements WriteResponse {
|
||||
|
||||
@Override
|
||||
public void setForcedRefresh(final boolean forcedRefresh) {
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action.bulk;
|
||||
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class TransportBulkShardOperationsAction
|
||||
extends TransportWriteAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
|
||||
|
||||
@Inject
|
||||
public TransportBulkShardOperationsAction(
|
||||
final Settings settings,
|
||||
final TransportService transportService,
|
||||
final ClusterService clusterService,
|
||||
final IndicesService indicesService,
|
||||
final ThreadPool threadPool,
|
||||
final ShardStateAction shardStateAction,
|
||||
final ActionFilters actionFilters,
|
||||
final IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(
|
||||
settings,
|
||||
BulkShardOperationsAction.NAME,
|
||||
transportService,
|
||||
clusterService,
|
||||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
indexNameExpressionResolver,
|
||||
BulkShardOperationsRequest::new,
|
||||
BulkShardOperationsRequest::new,
|
||||
ThreadPool.Names.BULK);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
|
||||
final BulkShardOperationsRequest request, final IndexShard primary) throws Exception {
|
||||
final Translog.Location location = applyTranslogOperations(request, primary, Engine.Operation.Origin.PRIMARY);
|
||||
return new WritePrimaryResult<>(request, new BulkShardOperationsResponse(), location, null, primary, logger);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(
|
||||
final BulkShardOperationsRequest request, final IndexShard replica) throws Exception {
|
||||
final Translog.Location location = applyTranslogOperations(request, replica, Engine.Operation.Origin.REPLICA);
|
||||
return new WriteReplicaResult<>(request, location, null, replica, logger);
|
||||
}
|
||||
|
||||
private Translog.Location applyTranslogOperations(
|
||||
final BulkShardOperationsRequest request, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException {
|
||||
Translog.Location location = null;
|
||||
for (final Translog.Operation operation : request.getOperations()) {
|
||||
final Engine.Result result = shard.applyTranslogOperation(operation, origin, m -> {});
|
||||
assert result.getSeqNo() == operation.seqNo();
|
||||
assert result.hasFailure() == false;
|
||||
location = locationToSync(location, result.getTranslogLocation());
|
||||
}
|
||||
assert request.getOperations().length == 0 || location != null;
|
||||
return location;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BulkShardOperationsResponse newResponseInstance() {
|
||||
return new BulkShardOperationsResponse();
|
||||
}
|
||||
|
||||
}
|
|
@ -24,9 +24,12 @@
|
|||
package org.elasticsearch.xpack.ccr;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.common.CheckedRunnable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
@ -40,12 +43,15 @@ import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
|
|||
import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
@ -85,8 +91,7 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
return nodePlugins();
|
||||
}
|
||||
|
||||
// Something like this will emulate what the xdrc persistent task will do for pulling
|
||||
// the changes:
|
||||
// this emulates what the CCR persistent task will do for pulling
|
||||
public void testGetOperationsBasedOnGlobalSequenceId() throws Exception {
|
||||
client().admin().indices().prepareCreate("index")
|
||||
.setSettings(Settings.builder().put("index.number_of_shards", 1))
|
||||
|
@ -104,16 +109,16 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
request.setMinSeqNo(0L);
|
||||
request.setMaxSeqNo(globalCheckPoint);
|
||||
ShardChangesAction.Response response = client().execute(ShardChangesAction.INSTANCE, request).get();
|
||||
assertThat(response.getOperations().size(), equalTo(3));
|
||||
Translog.Index operation = (Translog.Index) response.getOperations().get(0);
|
||||
assertThat(response.getOperations().length, equalTo(3));
|
||||
Translog.Index operation = (Translog.Index) response.getOperations()[0];
|
||||
assertThat(operation.seqNo(), equalTo(0L));
|
||||
assertThat(operation.id(), equalTo("1"));
|
||||
|
||||
operation = (Translog.Index) response.getOperations().get(1);
|
||||
operation = (Translog.Index) response.getOperations()[1];
|
||||
assertThat(operation.seqNo(), equalTo(1L));
|
||||
assertThat(operation.id(), equalTo("2"));
|
||||
|
||||
operation = (Translog.Index) response.getOperations().get(2);
|
||||
operation = (Translog.Index) response.getOperations()[2];
|
||||
assertThat(operation.seqNo(), equalTo(2L));
|
||||
assertThat(operation.id(), equalTo("3"));
|
||||
|
||||
|
@ -129,98 +134,151 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
request.setMinSeqNo(3L);
|
||||
request.setMaxSeqNo(globalCheckPoint);
|
||||
response = client().execute(ShardChangesAction.INSTANCE, request).get();
|
||||
assertThat(response.getOperations().size(), equalTo(3));
|
||||
operation = (Translog.Index) response.getOperations().get(0);
|
||||
assertThat(response.getOperations().length, equalTo(3));
|
||||
operation = (Translog.Index) response.getOperations()[0];
|
||||
assertThat(operation.seqNo(), equalTo(3L));
|
||||
assertThat(operation.id(), equalTo("3"));
|
||||
|
||||
operation = (Translog.Index) response.getOperations().get(1);
|
||||
operation = (Translog.Index) response.getOperations()[1];
|
||||
assertThat(operation.seqNo(), equalTo(4L));
|
||||
assertThat(operation.id(), equalTo("4"));
|
||||
|
||||
operation = (Translog.Index) response.getOperations().get(2);
|
||||
operation = (Translog.Index) response.getOperations()[2];
|
||||
assertThat(operation.seqNo(), equalTo(5L));
|
||||
assertThat(operation.id(), equalTo("5"));
|
||||
}
|
||||
|
||||
|
||||
public void testFollowIndex() throws Exception {
|
||||
final int numberOfPrimaryShards = randomIntBetween(1, 3);
|
||||
|
||||
assertAcked(client().admin().indices().prepareCreate("index1")
|
||||
.setSettings(Settings.builder().put("index.number_of_shards", numberOfPrimaryShards)));
|
||||
assertAcked(client().admin().indices().prepareCreate("index2")
|
||||
.setSettings(Settings.builder().put("index.number_of_shards", numberOfPrimaryShards)));
|
||||
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, Collections.emptyMap());
|
||||
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
|
||||
final String followerIndexSettings =
|
||||
getIndexSettings(numberOfPrimaryShards, Collections.singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
|
||||
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
|
||||
|
||||
ensureGreen("index1", "index2");
|
||||
|
||||
FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request();
|
||||
final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request();
|
||||
followRequest.setLeaderIndex("index1");
|
||||
followRequest.setFollowIndex("index2");
|
||||
client().execute(FollowExistingIndexAction.INSTANCE, followRequest).get();
|
||||
|
||||
int numDocs = randomIntBetween(2, 64);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
|
||||
final int firstBatchNumDocs = randomIntBetween(2, 64);
|
||||
for (int i = 0; i < firstBatchNumDocs; i++) {
|
||||
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
|
||||
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
|
||||
}
|
||||
|
||||
final Map<ShardId, Long> numDocsPerShard = new HashMap<>();
|
||||
ShardStats[] shardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
|
||||
for (ShardStats shardStat : shardStats) {
|
||||
if (shardStat.getShardRouting().primary()) {
|
||||
long value = shardStat.getStats().getIndexing().getTotal().getIndexCount() - 1;
|
||||
numDocsPerShard.put(shardStat.getShardRouting().shardId(), value);
|
||||
final Map<ShardId, Long> firstBatchNumDocsPerShard = new HashMap<>();
|
||||
final ShardStats[] firstBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
|
||||
for (final ShardStats shardStats : firstBatchShardStats) {
|
||||
if (shardStats.getShardRouting().primary()) {
|
||||
long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1;
|
||||
firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value);
|
||||
}
|
||||
}
|
||||
|
||||
assertBusy(() -> {
|
||||
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards));
|
||||
assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard));
|
||||
|
||||
for (PersistentTasksCustomMetaData.PersistentTask<?> task : tasks.tasks()) {
|
||||
ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams();
|
||||
ShardFollowTask.Status status = (ShardFollowTask.Status) task.getStatus();
|
||||
assertThat(status, notNullValue());
|
||||
assertThat(status.getProcessedGlobalCheckpoint(), equalTo(numDocsPerShard.get(shardFollowTask.getLeaderShardId())));
|
||||
}
|
||||
});
|
||||
|
||||
numDocs = randomIntBetween(2, 64);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
|
||||
for (int i = 0; i < firstBatchNumDocs; i++) {
|
||||
assertBusy(assertExpectedDocumentRunnable(i));
|
||||
}
|
||||
|
||||
numDocsPerShard.clear();
|
||||
shardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
|
||||
for (ShardStats shardStat : shardStats) {
|
||||
if (shardStat.getShardRouting().primary()) {
|
||||
long value = shardStat.getStats().getIndexing().getTotal().getIndexCount() - 1;
|
||||
numDocsPerShard.put(shardStat.getShardRouting().shardId(), value);
|
||||
final int secondBatchNumDocs = randomIntBetween(2, 64);
|
||||
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
|
||||
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
|
||||
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
|
||||
}
|
||||
|
||||
final Map<ShardId, Long> secondBatchNumDocsPerShard = new HashMap<>();
|
||||
final ShardStats[] secondBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
|
||||
for (final ShardStats shardStats : secondBatchShardStats) {
|
||||
if (shardStats.getShardRouting().primary()) {
|
||||
final long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1;
|
||||
secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value);
|
||||
}
|
||||
}
|
||||
|
||||
assertBusy(() -> {
|
||||
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards));
|
||||
assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard));
|
||||
|
||||
for (PersistentTasksCustomMetaData.PersistentTask<?> task : tasks.tasks()) {
|
||||
ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams();
|
||||
ShardFollowTask.Status status = (ShardFollowTask.Status) task.getStatus();
|
||||
assertThat(status, notNullValue());
|
||||
assertThat(status.getProcessedGlobalCheckpoint(), equalTo(numDocsPerShard.get(shardFollowTask.getLeaderShardId())));
|
||||
}
|
||||
});
|
||||
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
|
||||
assertBusy(assertExpectedDocumentRunnable(i));
|
||||
}
|
||||
|
||||
UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
|
||||
final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
|
||||
unfollowRequest.setFollowIndex("index2");
|
||||
client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get();
|
||||
|
||||
assertBusy(() -> {
|
||||
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
assertThat(tasks.tasks().size(), equalTo(0));
|
||||
});
|
||||
}
|
||||
|
||||
private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
|
||||
return () -> {
|
||||
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards));
|
||||
|
||||
for (PersistentTasksCustomMetaData.PersistentTask<?> task : tasks.tasks()) {
|
||||
final ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams();
|
||||
final ShardFollowTask.Status status = (ShardFollowTask.Status) task.getStatus();
|
||||
assertThat(status, notNullValue());
|
||||
assertThat(
|
||||
status.getProcessedGlobalCheckpoint(),
|
||||
equalTo(numDocsPerShard.get(shardFollowTask.getLeaderShardId())));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
|
||||
return () -> {
|
||||
final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get();
|
||||
assertTrue(getResponse.isExists());
|
||||
assertTrue((getResponse.getSource().containsKey("f")));
|
||||
assertThat(getResponse.getSource().get("f"), equalTo(value));
|
||||
};
|
||||
}
|
||||
|
||||
private String getIndexSettings(final int numberOfPrimaryShards, final Map<String, String> additionalIndexSettings) throws IOException {
|
||||
final String settings;
|
||||
try (XContentBuilder builder = jsonBuilder()) {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.startObject("settings");
|
||||
{
|
||||
builder.field("index.number_of_shards", numberOfPrimaryShards);
|
||||
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
|
||||
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
|
||||
}
|
||||
}
|
||||
builder.endObject();
|
||||
builder.startObject("mappings");
|
||||
{
|
||||
builder.startObject("doc");
|
||||
{
|
||||
builder.startObject("properties");
|
||||
{
|
||||
builder.startObject("f");
|
||||
{
|
||||
builder.field("type", "integer");
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
settings = builder.string();
|
||||
}
|
||||
return settings;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ import org.elasticsearch.index.translog.Translog;
|
|||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.LongStream;
|
||||
|
@ -49,12 +49,12 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
|
|||
int min = randomIntBetween(0, numWrites - 1);
|
||||
int max = randomIntBetween(min, numWrites - 1);
|
||||
|
||||
final List<Translog.Operation> operations = ShardChangesAction.getOperationsBetween(indexShard, min, max);
|
||||
final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, min, max);
|
||||
/*
|
||||
* We are not guaranteed that operations are returned to us in order they are in the translog (if our read crosses multiple
|
||||
* generations) so the best we can assert is that we see the expected operations.
|
||||
*/
|
||||
final Set<Long> seenSeqNos = operations.stream().map(Translog.Operation::seqNo).collect(Collectors.toSet());
|
||||
final Set<Long> seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toSet());
|
||||
final Set<Long> expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toSet());
|
||||
assertThat(seenSeqNos, equalTo(expectedSeqNos));
|
||||
}
|
||||
|
|
|
@ -5,25 +5,17 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardChangesAction.Response> {
|
||||
|
||||
@Override
|
||||
protected ShardChangesAction.Response createTestInstance() {
|
||||
int numOps = randomInt(8);
|
||||
List<Translog.Operation> operations = new ArrayList<>(numOps);
|
||||
final int numOps = randomInt(8);
|
||||
final Translog.Operation[] operations = new Translog.Operation[numOps];
|
||||
for (int i = 0; i < numOps; i++) {
|
||||
operations.add(new Translog.NoOp(i, 0, "test"));
|
||||
operations[i] = new Translog.NoOp(i, 0, "test");
|
||||
}
|
||||
return new ShardChangesAction.Response(operations);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue