diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java index 1c960841179..2619b03e951 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java @@ -57,6 +57,7 @@ public class FollowExistingIndexAction extends Action>() { @Override diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index 073bf7b6cfe..ace4a1ced18 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -30,10 +30,11 @@ public class ShardFollowTask implements PersistentTaskParams { static final ParseField LEADER_SHARD_INDEX_FIELD = new ParseField("leader_shard_index"); static final ParseField LEADER_SHARD_INDEX_UUID_FIELD = new ParseField("leader_shard_index_uuid"); static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard"); + static final ParseField BATCH_SIZE = new ParseField("batch_size"); public static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, (a) -> new ShardFollowTask(new ShardId((String) a[0], (String) a[1], (int) a[2]), - new ShardId((String) a[3], (String) a[4], (int) a[5]))); + new ShardId((String) a[3], (String) a[4], (int) a[5]), (long) a[6])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_FIELD); @@ -42,25 +43,25 @@ public class ShardFollowTask implements PersistentTaskParams { PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_UUID_FIELD); PARSER.declareInt(ConstructingObjectParser.constructorArg(), LEADER_SHARD_SHARDID_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), BATCH_SIZE); } - public ShardFollowTask(ShardId followShardId, ShardId leaderShardId) { + private final ShardId followShardId; + private final ShardId leaderShardId; + private final long batchSize; + + public ShardFollowTask(ShardId followShardId, ShardId leaderShardId, long batchSize) { this.followShardId = followShardId; this.leaderShardId = leaderShardId; + this.batchSize = batchSize; } public ShardFollowTask(StreamInput in) throws IOException { this.followShardId = ShardId.readShardId(in); this.leaderShardId = ShardId.readShardId(in); + this.batchSize = in.readVLong(); } - public static ShardFollowTask fromXContent(XContentParser parser) { - return PARSER.apply(parser, null); - } - - private final ShardId followShardId; - private final ShardId leaderShardId; - public ShardId getFollowShardId() { return followShardId; } @@ -69,6 +70,10 @@ public class ShardFollowTask implements PersistentTaskParams { return leaderShardId; } + public long getBatchSize() { + return batchSize; + } + @Override public String getWriteableName() { return NAME; @@ -78,6 +83,11 @@ public class ShardFollowTask implements PersistentTaskParams { public void writeTo(StreamOutput out) throws IOException { followShardId.writeTo(out); leaderShardId.writeTo(out); + out.writeVLong(batchSize); + } + + public static ShardFollowTask fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); } @Override @@ -89,6 +99,7 @@ public class ShardFollowTask implements PersistentTaskParams { builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName()); builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID()); builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id()); + builder.field(BATCH_SIZE.getPreferredName(), batchSize); return builder.endObject(); } @@ -98,12 +109,13 @@ public class ShardFollowTask implements PersistentTaskParams { if (o == null || getClass() != o.getClass()) return false; ShardFollowTask that = (ShardFollowTask) o; return Objects.equals(followShardId, that.followShardId) && - Objects.equals(leaderShardId, that.leaderShardId); + Objects.equals(leaderShardId, that.leaderShardId) && + batchSize == that.batchSize; } @Override public int hashCode() { - return Objects.hash(followShardId, leaderShardId); + return Objects.hash(followShardId, leaderShardId, batchSize); } public String toString() { diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index ad5bc2b67da..088b7b5d4c3 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -29,12 +29,12 @@ import java.util.Arrays; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; import java.util.function.Consumer; public class ShardFollowTasksExecutor extends PersistentTasksExecutor { - // TODO: turn into cluster wide settings: - private static final long BATCH_SIZE = 256; + static final long DEFAULT_BATCH_SIZE = 1024; private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500); private final Client client; @@ -91,7 +91,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { + Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME); + ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, DEFAULT_BATCH_SIZE, leaderShard, followerShard, e -> { if (e == null) { ShardFollowTask.Status newStatus = new ShardFollowTask.Status(); newStatus.setProcessedGlobalCheckpoint(leaderGlobalCheckPoint); @@ -125,23 +126,31 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor handler; private final Queue chunks = new ConcurrentLinkedQueue<>(); - ChunksCoordinator(ShardId leaderShard, ShardId followerShard, Consumer handler) { + ChunksCoordinator(Client client, Executor ccrExecutor, long batchSize, ShardId leaderShard, ShardId followerShard, + Consumer handler) { + this.client = client; + this.ccrExecutor = ccrExecutor; + this.batchSize = batchSize; this.leaderShard = leaderShard; this.followerShard = followerShard; this.handler = handler; } void createChucks(long from, long to) { - for (long i = from; i <= to; i += BATCH_SIZE) { - long v2 = i + BATCH_SIZE < to ? i + BATCH_SIZE : to; - chunks.add(new long[]{i, v2}); + for (long i = from; i < to; i += batchSize) { + long v2 = i + batchSize < to ? i + batchSize : to; + chunks.add(new long[]{i == from ? i : i + 1, v2}); } } @@ -151,7 +160,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { + ChunkProcessor processor = new ChunkProcessor(client, ccrExecutor, leaderShard, followerShard, e -> { if (e == null) { processChuck(); } else { @@ -161,15 +170,24 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor getChunks() { + return chunks; + } + } - class ChunkProcessor { + static class ChunkProcessor { + + private final Client client; + private final Executor ccrExecutor; private final ShardId leaderShard; private final ShardId followerShard; private final Consumer handler; - ChunkProcessor(ShardId leaderShard, ShardId followerShard, Consumer handler) { + ChunkProcessor(Client client, Executor ccrExecutor, ShardId leaderShard, ShardId followerShard, Consumer handler) { + this.client = client; + this.ccrExecutor = ccrExecutor; this.leaderShard = leaderShard; this.followerShard = followerShard; this.handler = handler; @@ -194,7 +212,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor client.execute(INSTANCE, request, new RestBuilderListener(channel) { @Override public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception { diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java new file mode 100644 index 00000000000..c79a82adbc9 --- /dev/null +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.client.Client; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunksCoordinator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class ChunksCoordinatorTests extends ESTestCase { + + public void testCreateChunks() { + Client client = mock(Client.class); + Executor ccrExecutor = mock(Executor.class); + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, 1024, leaderShardId, followShardId, e -> {}); + coordinator.createChucks(0, 1024); + List result = new ArrayList<>(coordinator.getChunks()); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0)[0], equalTo(0L)); + assertThat(result.get(0)[1], equalTo(1024L)); + + coordinator.getChunks().clear(); + coordinator.createChucks(0, 2048); + result = new ArrayList<>(coordinator.getChunks()); + assertThat(result.size(), equalTo(2)); + assertThat(result.get(0)[0], equalTo(0L)); + assertThat(result.get(0)[1], equalTo(1024L)); + assertThat(result.get(1)[0], equalTo(1025L)); + assertThat(result.get(1)[1], equalTo(2048L)); + + coordinator.getChunks().clear(); + coordinator.createChucks(0, 4096); + result = new ArrayList<>(coordinator.getChunks()); + assertThat(result.size(), equalTo(4)); + assertThat(result.get(0)[0], equalTo(0L)); + assertThat(result.get(0)[1], equalTo(1024L)); + assertThat(result.get(1)[0], equalTo(1025L)); + assertThat(result.get(1)[1], equalTo(2048L)); + assertThat(result.get(2)[0], equalTo(2049L)); + assertThat(result.get(2)[1], equalTo(3072L)); + assertThat(result.get(3)[0], equalTo(3073L)); + assertThat(result.get(3)[1], equalTo(4096L)); + + coordinator.getChunks().clear(); + coordinator.createChucks(4096, 8196); + result = new ArrayList<>(coordinator.getChunks()); + assertThat(result.size(), equalTo(5)); + assertThat(result.get(0)[0], equalTo(4096L)); + assertThat(result.get(0)[1], equalTo(5120L)); + assertThat(result.get(1)[0], equalTo(5121L)); + assertThat(result.get(1)[1], equalTo(6144L)); + assertThat(result.get(2)[0], equalTo(6145L)); + assertThat(result.get(2)[1], equalTo(7168L)); + assertThat(result.get(3)[0], equalTo(7169L)); + assertThat(result.get(3)[1], equalTo(8192L)); + assertThat(result.get(4)[0], equalTo(8193L)); + assertThat(result.get(4)[1], equalTo(8196L)); + } + +} diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java index 84f83fc5660..a636c2f0ca4 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -23,7 +23,8 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase