From be5f83a6bdc8452de25ea06dddb0c507fa5dda56 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 29 Nov 2017 07:29:56 -0500 Subject: [PATCH] Implement translog operation bulk action This commit adds a bulk action for apply translog operations in bulk to an index. This action is then used in the persistent task for CCR to apply shard changes from a leader shard. Relates #3147 --- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 6 +- .../xpack/ccr/action/ShardChangesAction.java | 33 ++-- .../ccr/action/ShardFollowTasksExecutor.java | 81 ++++---- .../bulk/BulkShardOperationsAction.java | 31 +++ .../bulk/BulkShardOperationsRequest.java | 57 ++++++ .../BulkShardOperationsRequestBuilder.java | 18 ++ .../bulk/BulkShardOperationsResponse.java | 18 ++ .../TransportBulkShardOperationsAction.java | 84 +++++++++ .../xpack/ccr/ShardChangesIT.java | 178 ++++++++++++------ .../ccr/action/ShardChangesActionTests.java | 6 +- .../ccr/action/ShardChangesResponseTests.java | 14 +- 11 files changed, 396 insertions(+), 130 deletions(-) create mode 100644 x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsAction.java create mode 100644 x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java create mode 100644 x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequestBuilder.java create mode 100644 x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java create mode 100644 x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index e58b9640ea1..2c8d4976312 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -28,6 +28,8 @@ import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor; import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import org.elasticsearch.xpack.ccr.rest.RestFollowExistingIndexAction; import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction; @@ -79,8 +81,8 @@ public final class Ccr { return Arrays.asList( new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class), new ActionHandler<>(FollowExistingIndexAction.INSTANCE, FollowExistingIndexAction.TransportAction.class), - new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class) - ); + new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class), + new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class)); } public List getRestHandlers(Settings settings, RestController restController) { diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index d48610a45bc..17c026720cb 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -35,6 +35,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -139,44 +140,44 @@ public class ShardChangesAction extends Action operations; + private Translog.Operation[] operations; Response() { } - Response(List operations) { + Response(final Translog.Operation[] operations) { this.operations = operations; } - public List getOperations() { + public Translog.Operation[] getOperations() { return operations; } @Override - public void readFrom(StreamInput in) throws IOException { + public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); - operations = Translog.readOperations(in); + operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } @Override - public void writeTo(StreamOutput out) throws IOException { + public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); - Translog.writeOperations(out, operations); + out.writeArray(Translog.Operation::writeOperation, operations); } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - Response response = (Response) o; - return Objects.equals(operations, response.operations); + final Response response = (Response) o; + return Arrays.equals(operations, response.operations); } @Override public int hashCode() { - return Objects.hash(operations); + return Arrays.hashCode(operations); } } @@ -209,7 +210,7 @@ public class ShardChangesAction extends Action operations = getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo); + Translog.Operation[] operations = getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo); return new Response(operations); } @@ -233,7 +234,9 @@ public class ShardChangesAction extends Action getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo) throws IOException { + private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; + + static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo) throws IOException { if (indexShard.state() != IndexShardState.STARTED) { throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state()); } @@ -250,7 +253,7 @@ public class ShardChangesAction extends Action { @@ -61,7 +61,6 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { + ChunksCoordinator coordinator = new ChunksCoordinator(leaderShard, followerShard, e -> { if (e == null) { ShardFollowTask.Status newStatus = new ShardFollowTask.Status(); newStatus.setProcessedGlobalCheckpoint(leaderGlobalCheckPoint); task.updatePersistentStatus(newStatus, ActionListener.wrap( - persistentTask -> prepare(task, leaderShard, leaderGlobalCheckPoint), task::markAsFailed) + persistentTask -> prepare(task, leaderShard, followerShard, leaderGlobalCheckPoint), task::markAsFailed) ); } else { task.markAsFailed(e); @@ -113,7 +111,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor handler; private final Queue chunks = new ConcurrentLinkedQueue<>(); - ChunksCoordinator(ShardId leaderShard, Consumer handler) { + ChunksCoordinator(ShardId leaderShard, ShardId followerShard, Consumer handler) { this.leaderShard = leaderShard; + this.followerShard = followerShard; this.handler = handler; } @@ -146,35 +146,37 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { - if (success) { + ChunkProcessor processor = new ChunkProcessor(leaderShard, followerShard, e -> { + if (e == null) { processChuck(); } else { handler.accept(e); } }); - processor.start(chuck[0], chuck[1]); + processor.start(chunk[0], chunk[1]); } } class ChunkProcessor { - private final ShardId shardId; - private final BiConsumer handler; + private final ShardId leaderShard; + private final ShardId followerShard; + private final Consumer handler; - ChunkProcessor(ShardId shardId, BiConsumer handler) { - this.shardId = shardId; + ChunkProcessor(ShardId leaderShard, ShardId followerShard, Consumer handler) { + this.leaderShard = leaderShard; + this.followerShard = followerShard; this.handler = handler; } void start(long from, long to) { - ShardChangesAction.Request request = new ShardChangesAction.Request(shardId); + ShardChangesAction.Request request = new ShardChangesAction.Request(leaderShard); request.setMinSeqNo(from); request.setMaxSeqNo(to); client.execute(ShardChangesAction.INSTANCE, request, new ActionListener() { @@ -185,7 +187,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor() { + @Override + public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) { + handler.accept(null); } - } - handler.accept(true, null); + + @Override + public void onFailure(final Exception e) { + assert e != null; + handler.accept(e); + } + }); } }); } - } + } diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsAction.java new file mode 100644 index 00000000000..d14bb6345a4 --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsAction.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action.bulk; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +public class BulkShardOperationsAction + extends Action { + + public static final BulkShardOperationsAction INSTANCE = new BulkShardOperationsAction(); + public static final String NAME = "indices:data/write/bulk_shard_operations[s]"; + + private BulkShardOperationsAction() { + super(NAME); + } + + @Override + public BulkShardOperationsRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new BulkShardOperationsRequestBuilder(client); + } + + @Override + public BulkShardOperationsResponse newResponse() { + return new BulkShardOperationsResponse(); + } + +} diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java new file mode 100644 index 00000000000..ef9d27ef919 --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action.bulk; + +import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; + +import java.io.IOException; + +public final class BulkShardOperationsRequest extends ReplicatedWriteRequest { + + private Translog.Operation[] operations; + + public BulkShardOperationsRequest() { + + } + + public BulkShardOperationsRequest(final ShardId shardId, final Translog.Operation[] operations) { + super(shardId); + setRefreshPolicy(RefreshPolicy.NONE); + this.operations = operations; + } + + public Translog.Operation[] getOperations() { + return operations; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeArray(Translog.Operation::writeOperation, operations); + } + + @Override + public String toString() { + return "BulkShardOperationsRequest{" + + "operations=" + operations.length+ + ", shardId=" + shardId + + ", timeout=" + timeout + + ", index='" + index + '\'' + + ", waitForActiveShards=" + waitForActiveShards + + '}'; + } + +} diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequestBuilder.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequestBuilder.java new file mode 100644 index 00000000000..38b13dca9c0 --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequestBuilder.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action.bulk; + +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +public class BulkShardOperationsRequestBuilder + extends ActionRequestBuilder { + + public BulkShardOperationsRequestBuilder(final ElasticsearchClient client) { + super(client, BulkShardOperationsAction.INSTANCE, new BulkShardOperationsRequest()); + } + +} diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java new file mode 100644 index 00000000000..62612e4bb4b --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action.bulk; + +import org.elasticsearch.action.support.WriteResponse; +import org.elasticsearch.action.support.replication.ReplicationResponse; + +public final class BulkShardOperationsResponse extends ReplicationResponse implements WriteResponse { + + @Override + public void setForcedRefresh(final boolean forcedRefresh) { + + } + +} diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java new file mode 100644 index 00000000000..b794ad10eac --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action.bulk; + +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +public class TransportBulkShardOperationsAction + extends TransportWriteAction { + + @Inject + public TransportBulkShardOperationsAction( + final Settings settings, + final TransportService transportService, + final ClusterService clusterService, + final IndicesService indicesService, + final ThreadPool threadPool, + final ShardStateAction shardStateAction, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver) { + super( + settings, + BulkShardOperationsAction.NAME, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + indexNameExpressionResolver, + BulkShardOperationsRequest::new, + BulkShardOperationsRequest::new, + ThreadPool.Names.BULK); + } + + @Override + protected WritePrimaryResult shardOperationOnPrimary( + final BulkShardOperationsRequest request, final IndexShard primary) throws Exception { + final Translog.Location location = applyTranslogOperations(request, primary, Engine.Operation.Origin.PRIMARY); + return new WritePrimaryResult<>(request, new BulkShardOperationsResponse(), location, null, primary, logger); + } + + @Override + protected WriteReplicaResult shardOperationOnReplica( + final BulkShardOperationsRequest request, final IndexShard replica) throws Exception { + final Translog.Location location = applyTranslogOperations(request, replica, Engine.Operation.Origin.REPLICA); + return new WriteReplicaResult<>(request, location, null, replica, logger); + } + + private Translog.Location applyTranslogOperations( + final BulkShardOperationsRequest request, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException { + Translog.Location location = null; + for (final Translog.Operation operation : request.getOperations()) { + final Engine.Result result = shard.applyTranslogOperation(operation, origin, m -> {}); + assert result.getSeqNo() == operation.seqNo(); + assert result.hasFailure() == false; + location = locationToSync(location, result.getTranslogLocation()); + } + assert request.getOperations().length == 0 || location != null; + return location; + } + + @Override + protected BulkShardOperationsResponse newResponseInstance() { + return new BulkShardOperationsResponse(); + } + +} diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 72a92b69d18..a47c37fb53d 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -24,9 +24,12 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -40,12 +43,15 @@ import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -85,8 +91,7 @@ public class ShardChangesIT extends ESIntegTestCase { return nodePlugins(); } - // Something like this will emulate what the xdrc persistent task will do for pulling - // the changes: + // this emulates what the CCR persistent task will do for pulling public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { client().admin().indices().prepareCreate("index") .setSettings(Settings.builder().put("index.number_of_shards", 1)) @@ -104,16 +109,16 @@ public class ShardChangesIT extends ESIntegTestCase { request.setMinSeqNo(0L); request.setMaxSeqNo(globalCheckPoint); ShardChangesAction.Response response = client().execute(ShardChangesAction.INSTANCE, request).get(); - assertThat(response.getOperations().size(), equalTo(3)); - Translog.Index operation = (Translog.Index) response.getOperations().get(0); + assertThat(response.getOperations().length, equalTo(3)); + Translog.Index operation = (Translog.Index) response.getOperations()[0]; assertThat(operation.seqNo(), equalTo(0L)); assertThat(operation.id(), equalTo("1")); - operation = (Translog.Index) response.getOperations().get(1); + operation = (Translog.Index) response.getOperations()[1]; assertThat(operation.seqNo(), equalTo(1L)); assertThat(operation.id(), equalTo("2")); - operation = (Translog.Index) response.getOperations().get(2); + operation = (Translog.Index) response.getOperations()[2]; assertThat(operation.seqNo(), equalTo(2L)); assertThat(operation.id(), equalTo("3")); @@ -129,98 +134,151 @@ public class ShardChangesIT extends ESIntegTestCase { request.setMinSeqNo(3L); request.setMaxSeqNo(globalCheckPoint); response = client().execute(ShardChangesAction.INSTANCE, request).get(); - assertThat(response.getOperations().size(), equalTo(3)); - operation = (Translog.Index) response.getOperations().get(0); + assertThat(response.getOperations().length, equalTo(3)); + operation = (Translog.Index) response.getOperations()[0]; assertThat(operation.seqNo(), equalTo(3L)); assertThat(operation.id(), equalTo("3")); - operation = (Translog.Index) response.getOperations().get(1); + operation = (Translog.Index) response.getOperations()[1]; assertThat(operation.seqNo(), equalTo(4L)); assertThat(operation.id(), equalTo("4")); - operation = (Translog.Index) response.getOperations().get(2); + operation = (Translog.Index) response.getOperations()[2]; assertThat(operation.seqNo(), equalTo(5L)); assertThat(operation.id(), equalTo("5")); } - public void testFollowIndex() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); - assertAcked(client().admin().indices().prepareCreate("index1") - .setSettings(Settings.builder().put("index.number_of_shards", numberOfPrimaryShards))); - assertAcked(client().admin().indices().prepareCreate("index2") - .setSettings(Settings.builder().put("index.number_of_shards", numberOfPrimaryShards))); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, Collections.emptyMap()); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + + final String followerIndexSettings = + getIndexSettings(numberOfPrimaryShards, Collections.singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); + ensureGreen("index1", "index2"); - FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request(); + final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request(); followRequest.setLeaderIndex("index1"); followRequest.setFollowIndex("index2"); client().execute(FollowExistingIndexAction.INSTANCE, followRequest).get(); - int numDocs = randomIntBetween(2, 64); - for (int i = 0; i < numDocs; i++) { - client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); + final int firstBatchNumDocs = randomIntBetween(2, 64); + for (int i = 0; i < firstBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); } - final Map numDocsPerShard = new HashMap<>(); - ShardStats[] shardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); - for (ShardStats shardStat : shardStats) { - if (shardStat.getShardRouting().primary()) { - long value = shardStat.getStats().getIndexing().getTotal().getIndexCount() - 1; - numDocsPerShard.put(shardStat.getShardRouting().shardId(), value); + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); } } - assertBusy(() -> { - ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards)); + assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); - for (PersistentTasksCustomMetaData.PersistentTask task : tasks.tasks()) { - ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams(); - ShardFollowTask.Status status = (ShardFollowTask.Status) task.getStatus(); - assertThat(status, notNullValue()); - assertThat(status.getProcessedGlobalCheckpoint(), equalTo(numDocsPerShard.get(shardFollowTask.getLeaderShardId()))); - } - }); - - numDocs = randomIntBetween(2, 64); - for (int i = 0; i < numDocs; i++) { - client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); + for (int i = 0; i < firstBatchNumDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); } - numDocsPerShard.clear(); - shardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); - for (ShardStats shardStat : shardStats) { - if (shardStat.getShardRouting().primary()) { - long value = shardStat.getStats().getIndexing().getTotal().getIndexCount() - 1; - numDocsPerShard.put(shardStat.getShardRouting().shardId(), value); + final int secondBatchNumDocs = randomIntBetween(2, 64); + for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + final Map secondBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] secondBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : secondBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + final long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); } } - assertBusy(() -> { - ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards)); + assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard)); - for (PersistentTasksCustomMetaData.PersistentTask task : tasks.tasks()) { - ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams(); - ShardFollowTask.Status status = (ShardFollowTask.Status) task.getStatus(); - assertThat(status, notNullValue()); - assertThat(status.getProcessedGlobalCheckpoint(), equalTo(numDocsPerShard.get(shardFollowTask.getLeaderShardId()))); - } - }); + for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } - UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); + final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); unfollowRequest.setFollowIndex("index2"); client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); assertBusy(() -> { - ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); assertThat(tasks.tasks().size(), equalTo(0)); }); } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { + return () -> { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards)); + + for (PersistentTasksCustomMetaData.PersistentTask task : tasks.tasks()) { + final ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams(); + final ShardFollowTask.Status status = (ShardFollowTask.Status) task.getStatus(); + assertThat(status, notNullValue()); + assertThat( + status.getProcessedGlobalCheckpoint(), + equalTo(numDocsPerShard.get(shardFollowTask.getLeaderShardId()))); + } + }; + } + + private CheckedRunnable assertExpectedDocumentRunnable(final int value) { + return () -> { + final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get(); + assertTrue(getResponse.isExists()); + assertTrue((getResponse.getSource().containsKey("f"))); + assertThat(getResponse.getSource().get("f"), equalTo(value)); + }; + } + + private String getIndexSettings(final int numberOfPrimaryShards, final Map additionalIndexSettings) throws IOException { + final String settings; + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("settings"); + { + builder.field("index.number_of_shards", numberOfPrimaryShards); + for (final Map.Entry additionalSetting : additionalIndexSettings.entrySet()) { + builder.field(additionalSetting.getKey(), additionalSetting.getValue()); + } + } + builder.endObject(); + builder.startObject("mappings"); + { + builder.startObject("doc"); + { + builder.startObject("properties"); + { + builder.startObject("f"); + { + builder.field("type", "integer"); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + settings = builder.string(); + } + return settings; + } + } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java index 67cc05de8b7..839e493c75e 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -19,7 +19,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESSingleNodeTestCase; import org.mockito.Mockito; -import java.util.List; +import java.util.Arrays; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -49,12 +49,12 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase { int min = randomIntBetween(0, numWrites - 1); int max = randomIntBetween(min, numWrites - 1); - final List operations = ShardChangesAction.getOperationsBetween(indexShard, min, max); + final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, min, max); /* * We are not guaranteed that operations are returned to us in order they are in the translog (if our read crosses multiple * generations) so the best we can assert is that we see the expected operations. */ - final Set seenSeqNos = operations.stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()); + final Set seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toSet()); final Set expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toSet()); assertThat(seenSeqNos, equalTo(expectedSeqNos)); } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java index 4e8d870a096..d169be667cd 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -5,25 +5,17 @@ */ package org.elasticsearch.xpack.ccr.action; -import org.elasticsearch.index.seqno.SequenceNumbers; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.AbstractStreamableTestCase; -import java.util.ArrayList; -import java.util.List; - -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.nullValue; - public class ShardChangesResponseTests extends AbstractStreamableTestCase { @Override protected ShardChangesAction.Response createTestInstance() { - int numOps = randomInt(8); - List operations = new ArrayList<>(numOps); + final int numOps = randomInt(8); + final Translog.Operation[] operations = new Translog.Operation[numOps]; for (int i = 0; i < numOps; i++) { - operations.add(new Translog.NoOp(i, 0, "test")); + operations[i] = new Translog.NoOp(i, 0, "test"); } return new ShardChangesAction.Response(operations); }