* Made ChunkCoordinator unittest testable

* Fixed a small issue where each batch would fetch / index the previous batch last operation
* Made batch size a request param on the follow existing index api request.
  This makes is easy to tune this param when running tests from scripts.
* Changed default batch size from 256 to 1024.
This commit is contained in:
Martijn van Groningen 2017-11-30 15:53:40 +01:00
parent 2e382bf7f3
commit c3f7d4f580
6 changed files with 145 additions and 25 deletions

View File

@ -57,6 +57,7 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
private String leaderIndex;
private String followIndex;
private long batchSize = ShardFollowTasksExecutor.DEFAULT_BATCH_SIZE;
public String getLeaderIndex() {
return leaderIndex;
@ -74,6 +75,18 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
this.followIndex = followIndex;
}
public long getBatchSize() {
return batchSize;
}
public void setBatchSize(long batchSize) {
if (batchSize < 1) {
throw new IllegalArgumentException("Illegal batch_size [" + batchSize + "]");
}
this.batchSize = batchSize;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -84,6 +97,7 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
super.readFrom(in);
leaderIndex = in.readString();
followIndex = in.readString();
batchSize = in.readVLong();
}
@Override
@ -91,6 +105,7 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
super.writeTo(out);
out.writeString(leaderIndex);
out.writeString(followIndex);
out.writeVLong(batchSize);
}
}
@ -153,7 +168,7 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId));
new ShardId(leaderIndexMetadata.getIndex(), shardId), request.batchSize);
persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override

View File

@ -30,10 +30,11 @@ public class ShardFollowTask implements PersistentTaskParams {
static final ParseField LEADER_SHARD_INDEX_FIELD = new ParseField("leader_shard_index");
static final ParseField LEADER_SHARD_INDEX_UUID_FIELD = new ParseField("leader_shard_index_uuid");
static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard");
static final ParseField BATCH_SIZE = new ParseField("batch_size");
public static ConstructingObjectParser<ShardFollowTask, Void> PARSER = new ConstructingObjectParser<>(NAME,
(a) -> new ShardFollowTask(new ShardId((String) a[0], (String) a[1], (int) a[2]),
new ShardId((String) a[3], (String) a[4], (int) a[5])));
new ShardId((String) a[3], (String) a[4], (int) a[5]), (long) a[6]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_FIELD);
@ -42,25 +43,25 @@ public class ShardFollowTask implements PersistentTaskParams {
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_UUID_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), LEADER_SHARD_SHARDID_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), BATCH_SIZE);
}
public ShardFollowTask(ShardId followShardId, ShardId leaderShardId) {
private final ShardId followShardId;
private final ShardId leaderShardId;
private final long batchSize;
public ShardFollowTask(ShardId followShardId, ShardId leaderShardId, long batchSize) {
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
this.batchSize = batchSize;
}
public ShardFollowTask(StreamInput in) throws IOException {
this.followShardId = ShardId.readShardId(in);
this.leaderShardId = ShardId.readShardId(in);
this.batchSize = in.readVLong();
}
public static ShardFollowTask fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
private final ShardId followShardId;
private final ShardId leaderShardId;
public ShardId getFollowShardId() {
return followShardId;
}
@ -69,6 +70,10 @@ public class ShardFollowTask implements PersistentTaskParams {
return leaderShardId;
}
public long getBatchSize() {
return batchSize;
}
@Override
public String getWriteableName() {
return NAME;
@ -78,6 +83,11 @@ public class ShardFollowTask implements PersistentTaskParams {
public void writeTo(StreamOutput out) throws IOException {
followShardId.writeTo(out);
leaderShardId.writeTo(out);
out.writeVLong(batchSize);
}
public static ShardFollowTask fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
@Override
@ -89,6 +99,7 @@ public class ShardFollowTask implements PersistentTaskParams {
builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName());
builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID());
builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id());
builder.field(BATCH_SIZE.getPreferredName(), batchSize);
return builder.endObject();
}
@ -98,12 +109,13 @@ public class ShardFollowTask implements PersistentTaskParams {
if (o == null || getClass() != o.getClass()) return false;
ShardFollowTask that = (ShardFollowTask) o;
return Objects.equals(followShardId, that.followShardId) &&
Objects.equals(leaderShardId, that.leaderShardId);
Objects.equals(leaderShardId, that.leaderShardId) &&
batchSize == that.batchSize;
}
@Override
public int hashCode() {
return Objects.hash(followShardId, leaderShardId);
return Objects.hash(followShardId, leaderShardId, batchSize);
}
public String toString() {

View File

@ -29,12 +29,12 @@ import java.util.Arrays;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollowTask> {
// TODO: turn into cluster wide settings:
private static final long BATCH_SIZE = 256;
static final long DEFAULT_BATCH_SIZE = 1024;
private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500);
private final Client client;
@ -91,7 +91,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
} else {
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + leaderGlobalCheckPoint +
"] is not below leaderGlobalCheckPoint [" + followGlobalCheckPoint + "]";
ChunksCoordinator coordinator = new ChunksCoordinator(leaderShard, followerShard, e -> {
Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME);
ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, DEFAULT_BATCH_SIZE, leaderShard, followerShard, e -> {
if (e == null) {
ShardFollowTask.Status newStatus = new ShardFollowTask.Status();
newStatus.setProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
@ -125,23 +126,31 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
});
}
class ChunksCoordinator {
static class ChunksCoordinator {
private final Client client;
private final Executor ccrExecutor;
private final long batchSize;
private final ShardId leaderShard;
private final ShardId followerShard;
private final Consumer<Exception> handler;
private final Queue<long[]> chunks = new ConcurrentLinkedQueue<>();
ChunksCoordinator(ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
ChunksCoordinator(Client client, Executor ccrExecutor, long batchSize, ShardId leaderShard, ShardId followerShard,
Consumer<Exception> handler) {
this.client = client;
this.ccrExecutor = ccrExecutor;
this.batchSize = batchSize;
this.leaderShard = leaderShard;
this.followerShard = followerShard;
this.handler = handler;
}
void createChucks(long from, long to) {
for (long i = from; i <= to; i += BATCH_SIZE) {
long v2 = i + BATCH_SIZE < to ? i + BATCH_SIZE : to;
chunks.add(new long[]{i, v2});
for (long i = from; i < to; i += batchSize) {
long v2 = i + batchSize < to ? i + batchSize : to;
chunks.add(new long[]{i == from ? i : i + 1, v2});
}
}
@ -151,7 +160,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
handler.accept(null);
return;
}
ChunkProcessor processor = new ChunkProcessor(leaderShard, followerShard, e -> {
ChunkProcessor processor = new ChunkProcessor(client, ccrExecutor, leaderShard, followerShard, e -> {
if (e == null) {
processChuck();
} else {
@ -161,15 +170,24 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
processor.start(chunk[0], chunk[1]);
}
Queue<long[]> getChunks() {
return chunks;
}
}
class ChunkProcessor {
static class ChunkProcessor {
private final Client client;
private final Executor ccrExecutor;
private final ShardId leaderShard;
private final ShardId followerShard;
private final Consumer<Exception> handler;
ChunkProcessor(ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
ChunkProcessor(Client client, Executor ccrExecutor, ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
this.client = client;
this.ccrExecutor = ccrExecutor;
this.leaderShard = leaderShard;
this.followerShard = followerShard;
this.handler = handler;
@ -194,7 +212,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
}
void handleResponse(ShardChangesAction.Response response) {
threadPool.executor(Ccr.CCR_THREAD_POOL_NAME).execute(new AbstractRunnable() {
ccrExecutor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
assert e != null;

View File

@ -41,6 +41,7 @@ public class RestFollowExistingIndexAction extends BaseRestHandler {
Request request = new Request();
request.setLeaderIndex(restRequest.param("leader_index"));
request.setFollowIndex(restRequest.param("follow_index"));
request.setBatchSize(Long.valueOf(restRequest.param("batch_size")));
return channel -> client.execute(INSTANCE, request, new RestBuilderListener<Response>(channel) {
@Override
public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {

View File

@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunksCoordinator;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
public class ChunksCoordinatorTests extends ESTestCase {
public void testCreateChunks() {
Client client = mock(Client.class);
Executor ccrExecutor = mock(Executor.class);
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, 1024, leaderShardId, followShardId, e -> {});
coordinator.createChucks(0, 1024);
List<long[]> result = new ArrayList<>(coordinator.getChunks());
assertThat(result.size(), equalTo(1));
assertThat(result.get(0)[0], equalTo(0L));
assertThat(result.get(0)[1], equalTo(1024L));
coordinator.getChunks().clear();
coordinator.createChucks(0, 2048);
result = new ArrayList<>(coordinator.getChunks());
assertThat(result.size(), equalTo(2));
assertThat(result.get(0)[0], equalTo(0L));
assertThat(result.get(0)[1], equalTo(1024L));
assertThat(result.get(1)[0], equalTo(1025L));
assertThat(result.get(1)[1], equalTo(2048L));
coordinator.getChunks().clear();
coordinator.createChucks(0, 4096);
result = new ArrayList<>(coordinator.getChunks());
assertThat(result.size(), equalTo(4));
assertThat(result.get(0)[0], equalTo(0L));
assertThat(result.get(0)[1], equalTo(1024L));
assertThat(result.get(1)[0], equalTo(1025L));
assertThat(result.get(1)[1], equalTo(2048L));
assertThat(result.get(2)[0], equalTo(2049L));
assertThat(result.get(2)[1], equalTo(3072L));
assertThat(result.get(3)[0], equalTo(3073L));
assertThat(result.get(3)[1], equalTo(4096L));
coordinator.getChunks().clear();
coordinator.createChucks(4096, 8196);
result = new ArrayList<>(coordinator.getChunks());
assertThat(result.size(), equalTo(5));
assertThat(result.get(0)[0], equalTo(4096L));
assertThat(result.get(0)[1], equalTo(5120L));
assertThat(result.get(1)[0], equalTo(5121L));
assertThat(result.get(1)[1], equalTo(6144L));
assertThat(result.get(2)[0], equalTo(6145L));
assertThat(result.get(2)[1], equalTo(7168L));
assertThat(result.get(3)[0], equalTo(7169L));
assertThat(result.get(3)[1], equalTo(8192L));
assertThat(result.get(4)[0], equalTo(8193L));
assertThat(result.get(4)[1], equalTo(8196L));
}
}

View File

@ -23,7 +23,8 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase<ShardFollo
protected ShardFollowTask createTestInstance() {
return new ShardFollowTask(
new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)),
new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5))
new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)),
randomIntBetween(1, Integer.MAX_VALUE)
);
}