Changed the chunk coordinator to process the chunks concurrently. (#3256)

The shard follow task executor determines the range of translog operations
between the leader shard's global checkpoint and the last know processed
seqno by the current shard follow task that are missing.

Then the chunks coordinator can then chunk this range up in smaller ranges
if the requested range is above the configured max chunk size. If it is
smaller than the entire range then the chunk coordinator has just one
chuck to coordinate.

Each chunk is added to a queue and is processed by the ChunkProcessor,
that reads the translog ops from the leader shard and then indexes
these translog ops into the follow shard. After that a new chuck is polled
from the queue and the ChunkProcessor performs the same actions until
there are no more chunks in the queue to process. After that the shard
follow task executor will determine a new range of translog operations
to process.

This change changes the chunk coordinator to start polling from the chunk
queue with multiple threads at the same time to handle dealing with a higher
indexing load on the leader side better.
This commit is contained in:
Martijn van Groningen 2018-01-10 15:29:11 +01:00 committed by GitHub
parent 38aea9588b
commit a56b0479e1
6 changed files with 392 additions and 38 deletions

View File

@ -58,6 +58,7 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
private String leaderIndex;
private String followIndex;
private long batchSize = ShardFollowTasksExecutor.DEFAULT_BATCH_SIZE;
private int concurrentProcessors = ShardFollowTasksExecutor.DEFAULT_CONCURRENT_PROCESSORS;
public String getLeaderIndex() {
return leaderIndex;
@ -87,6 +88,17 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
this.batchSize = batchSize;
}
public int getConcurrentProcessors() {
return concurrentProcessors;
}
public void setConcurrentProcessors(int concurrentProcessors) {
if (concurrentProcessors < 1) {
throw new IllegalArgumentException("concurrent_processors must be larger than 0");
}
this.concurrentProcessors = concurrentProcessors;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -168,7 +180,7 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId), request.batchSize);
new ShardId(leaderIndexMetadata.getIndex(), shardId), request.batchSize, request.concurrentProcessors);
persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override

View File

@ -30,11 +30,12 @@ public class ShardFollowTask implements PersistentTaskParams {
static final ParseField LEADER_SHARD_INDEX_FIELD = new ParseField("leader_shard_index");
static final ParseField LEADER_SHARD_INDEX_UUID_FIELD = new ParseField("leader_shard_index_uuid");
static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard");
static final ParseField BATCH_SIZE = new ParseField("batch_size");
public static final ParseField MAX_CHUNK_SIZE = new ParseField("max_chunk_size");
public static final ParseField NUM_CONCURRENT_CHUNKS = new ParseField("max_concurrent_chunks");
public static ConstructingObjectParser<ShardFollowTask, Void> PARSER = new ConstructingObjectParser<>(NAME,
(a) -> new ShardFollowTask(new ShardId((String) a[0], (String) a[1], (int) a[2]),
new ShardId((String) a[3], (String) a[4], (int) a[5]), (long) a[6]));
new ShardId((String) a[3], (String) a[4], (int) a[5]), (long) a[6], (int) a[7]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_FIELD);
@ -43,23 +44,27 @@ public class ShardFollowTask implements PersistentTaskParams {
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_UUID_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), LEADER_SHARD_SHARDID_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), BATCH_SIZE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAX_CHUNK_SIZE);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUM_CONCURRENT_CHUNKS);
}
private final ShardId followShardId;
private final ShardId leaderShardId;
private final long batchSize;
private final long maxChunkSize;
private final int numConcurrentChunks;
public ShardFollowTask(ShardId followShardId, ShardId leaderShardId, long batchSize) {
ShardFollowTask(ShardId followShardId, ShardId leaderShardId, long maxChunkSize, int numConcurrentChunks) {
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
this.batchSize = batchSize;
this.maxChunkSize = maxChunkSize;
this.numConcurrentChunks = numConcurrentChunks;
}
public ShardFollowTask(StreamInput in) throws IOException {
this.followShardId = ShardId.readShardId(in);
this.leaderShardId = ShardId.readShardId(in);
this.batchSize = in.readVLong();
this.maxChunkSize = in.readVLong();
this.numConcurrentChunks = in.readVInt();
}
public ShardId getFollowShardId() {
@ -70,8 +75,12 @@ public class ShardFollowTask implements PersistentTaskParams {
return leaderShardId;
}
public long getBatchSize() {
return batchSize;
public long getMaxChunkSize() {
return maxChunkSize;
}
public int getNumConcurrentChunks() {
return numConcurrentChunks;
}
@Override
@ -83,7 +92,8 @@ public class ShardFollowTask implements PersistentTaskParams {
public void writeTo(StreamOutput out) throws IOException {
followShardId.writeTo(out);
leaderShardId.writeTo(out);
out.writeVLong(batchSize);
out.writeVLong(maxChunkSize);
out.writeVInt(numConcurrentChunks);
}
public static ShardFollowTask fromXContent(XContentParser parser) {
@ -99,7 +109,8 @@ public class ShardFollowTask implements PersistentTaskParams {
builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName());
builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID());
builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id());
builder.field(BATCH_SIZE.getPreferredName(), batchSize);
builder.field(MAX_CHUNK_SIZE.getPreferredName(), maxChunkSize);
builder.field(NUM_CONCURRENT_CHUNKS.getPreferredName(), numConcurrentChunks);
return builder.endObject();
}
@ -110,12 +121,13 @@ public class ShardFollowTask implements PersistentTaskParams {
ShardFollowTask that = (ShardFollowTask) o;
return Objects.equals(followShardId, that.followShardId) &&
Objects.equals(leaderShardId, that.leaderShardId) &&
batchSize == that.batchSize;
maxChunkSize == that.maxChunkSize &&
numConcurrentChunks == that.numConcurrentChunks;
}
@Override
public int hashCode() {
return Objects.hash(followShardId, leaderShardId, batchSize);
return Objects.hash(followShardId, leaderShardId, maxChunkSize, numConcurrentChunks);
}
public String toString() {

View File

@ -5,15 +5,21 @@
*/
package org.elasticsearch.xpack.ccr.action;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
@ -30,11 +36,15 @@ import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollowTask> {
static final long DEFAULT_BATCH_SIZE = 1024;
static final int PROCESSOR_RETRY_LIMIT = 16;
static final int DEFAULT_CONCURRENT_PROCESSORS = 1;
private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500);
private final Client client;
@ -68,15 +78,18 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
} else {
followGlobalCheckPoint = SequenceNumbers.NO_OPS_PERFORMED;
}
prepare(task, params.getLeaderShardId(), params.getFollowShardId(), params.getBatchSize(), followGlobalCheckPoint);
logger.info("Starting shard following [{}]", params);
prepare(task, params, followGlobalCheckPoint);
}
void prepare(AllocatedPersistentTask task, ShardId leaderShard, ShardId followerShard, long batchSize, long followGlobalCheckPoint) {
void prepare(AllocatedPersistentTask task, ShardFollowTask params, long followGlobalCheckPoint) {
if (task.getState() != AllocatedPersistentTask.State.STARTED) {
// TODO: need better cancellation control
return;
}
final ShardId leaderShard = params.getLeaderShardId();
final ShardId followerShard = params.getFollowShardId();
client.admin().indices().stats(new IndicesStatsRequest().indices(leaderShard.getIndexName()), ActionListener.wrap(r -> {
Optional<ShardStats> leaderShardStats = Arrays.stream(r.getIndex(leaderShard.getIndexName()).getShards())
.filter(shardStats -> shardStats.getShardRouting().shardId().equals(leaderShard))
@ -87,7 +100,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
final long leaderGlobalCheckPoint = leaderShardStats.get().getSeqNoStats().getGlobalCheckpoint();
// TODO: check if both indices have the same history uuid
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
retry(task, leaderShard, followerShard, batchSize, followGlobalCheckPoint);
retry(task, params, followGlobalCheckPoint);
} else {
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + leaderGlobalCheckPoint +
"] is not below leaderGlobalCheckPoint [" + followGlobalCheckPoint + "]";
@ -96,17 +109,17 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
if (e == null) {
ShardFollowTask.Status newStatus = new ShardFollowTask.Status();
newStatus.setProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
task.updatePersistentStatus(newStatus, ActionListener.wrap(persistentTask -> prepare(task,
leaderShard, followerShard, batchSize, leaderGlobalCheckPoint), task::markAsFailed)
task.updatePersistentStatus(newStatus, ActionListener.wrap(
persistentTask -> prepare(task, params, leaderGlobalCheckPoint), task::markAsFailed)
);
} else {
task.markAsFailed(e);
}
};
ChunksCoordinator coordinator =
new ChunksCoordinator(client, ccrExecutor, batchSize, leaderShard, followerShard, handler);
ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, params.getMaxChunkSize(),
params.getNumConcurrentChunks(), leaderShard, followerShard, handler);
coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint);
coordinator.processChuck();
coordinator.start();
}
} else {
task.markAsFailed(new IllegalArgumentException("Cannot find shard stats for primary leader shard"));
@ -114,8 +127,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
}, task::markAsFailed));
}
private void retry(AllocatedPersistentTask task, ShardId leaderShard, ShardId followerShard, long batchSize,
long followGlobalCheckPoint) {
private void retry(AllocatedPersistentTask task, ShardFollowTask params, long followGlobalCheckPoint) {
threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
@ -124,55 +136,98 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
protected void doRun() throws Exception {
prepare(task, leaderShard, followerShard, batchSize, followGlobalCheckPoint);
prepare(task, params, followGlobalCheckPoint);
}
});
}
static class ChunksCoordinator {
private static final Logger LOGGER = Loggers.getLogger(ChunksCoordinator.class);
private final Client client;
private final Executor ccrExecutor;
private final long batchSize;
private final int concurrentProcessors;
private final ShardId leaderShard;
private final ShardId followerShard;
private final Consumer<Exception> handler;
private final Queue<long[]> chunks = new ConcurrentLinkedQueue<>();
ChunksCoordinator(Client client, Executor ccrExecutor, long batchSize, ShardId leaderShard, ShardId followerShard,
Consumer<Exception> handler) {
private final CountDown countDown;
private final Queue<long[]> chunks = new ConcurrentLinkedQueue<>();
private final AtomicReference<Exception> failureHolder = new AtomicReference<>();
ChunksCoordinator(Client client, Executor ccrExecutor, long batchSize, int concurrentProcessors,
ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
this.client = client;
this.ccrExecutor = ccrExecutor;
this.batchSize = batchSize;
this.concurrentProcessors = concurrentProcessors;
this.leaderShard = leaderShard;
this.followerShard = followerShard;
this.handler = handler;
this.countDown = new CountDown(concurrentProcessors);
}
void createChucks(long from, long to) {
LOGGER.debug("[{}] Creating chunks for operation range [{}] to [{}]", leaderShard, from, to);
for (long i = from; i < to; i += batchSize) {
long v2 = i + batchSize < to ? i + batchSize : to;
chunks.add(new long[]{i == from ? i : i + 1, v2});
}
}
void processChuck() {
void start() {
LOGGER.debug("[{}] Start coordination of [{}] chunks with [{}] concurrent processors",
leaderShard, chunks.size(), concurrentProcessors);
for (int i = 0; i < concurrentProcessors; i++) {
ccrExecutor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
assert e != null;
LOGGER.error(() -> new ParameterizedMessage("[{}] Failure starting processor", leaderShard), e);
postProcessChuck(e);
}
@Override
protected void doRun() throws Exception {
processNextChunk();
}
});
}
}
void processNextChunk() {
long[] chunk = chunks.poll();
if (chunk == null) {
handler.accept(null);
postProcessChuck(null);
return;
}
LOGGER.debug("[{}] Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
ChunkProcessor processor = new ChunkProcessor(client, ccrExecutor, leaderShard, followerShard, e -> {
if (e == null) {
processChuck();
LOGGER.debug("[{}] Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
processNextChunk();
} else {
handler.accept(e);
LOGGER.error(() -> new ParameterizedMessage("[{}] Failure processing chunk [{}/{}]",
leaderShard, chunk[0], chunk[1]), e);
postProcessChuck(e);
}
});
processor.start(chunk[0], chunk[1]);
}
void postProcessChuck(Exception e) {
if (failureHolder.compareAndSet(null, e) == false) {
Exception firstFailure = failureHolder.get();
firstFailure.addSuppressed(e);
}
if (countDown.countDown()) {
handler.accept(failureHolder.get());
}
}
Queue<long[]> getChunks() {
return chunks;
}
@ -187,6 +242,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
private final ShardId leaderShard;
private final ShardId followerShard;
private final Consumer<Exception> handler;
final AtomicInteger retryCounter = new AtomicInteger(0);
ChunkProcessor(Client client, Executor ccrExecutor, ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
this.client = client;
@ -209,8 +265,17 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
public void onFailure(Exception e) {
assert e != null;
if (shouldRetry(e)) {
if (retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) {
start(from, to);
} else {
handler.accept(new ElasticsearchException("retrying failed [" + PROCESSOR_RETRY_LIMIT +
"] times, aborting...", e));
}
} else {
handler.accept(e);
}
}
});
}
@ -233,6 +298,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
public void onFailure(final Exception e) {
// No retry mechanism here, because if a failure is being redirected to this place it is considered
// non recoverable.
assert e != null;
handler.accept(e);
}
@ -241,6 +308,12 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
});
}
boolean shouldRetry(Exception e) {
// TODO: What other exceptions should be retried?
return NetworkExceptionHelper.isConnectException(e) ||
NetworkExceptionHelper.isCloseConnectionException(e);
}
}
}

View File

@ -15,6 +15,7 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import java.io.IOException;
@ -41,8 +42,11 @@ public class RestFollowExistingIndexAction extends BaseRestHandler {
Request request = new Request();
request.setLeaderIndex(restRequest.param("leader_index"));
request.setFollowIndex(restRequest.param("follow_index"));
if (restRequest.hasParam("batch_size")) {
request.setBatchSize(Long.valueOf(restRequest.param("batch_size")));
if (restRequest.hasParam(ShardFollowTask.MAX_CHUNK_SIZE.getPreferredName())) {
request.setBatchSize(Long.valueOf(restRequest.param(ShardFollowTask.MAX_CHUNK_SIZE.getPreferredName())));
}
if (restRequest.hasParam(ShardFollowTask.NUM_CONCURRENT_CHUNKS.getPreferredName())) {
request.setConcurrentProcessors(Integer.valueOf(restRequest.param(ShardFollowTask.NUM_CONCURRENT_CHUNKS.getPreferredName())));
}
return channel -> client.execute(INSTANCE, request, new RestBuilderListener<Response>(channel) {
@Override

View File

@ -5,27 +5,49 @@
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunkProcessor;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunksCoordinator;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class ChunksCoordinatorTests extends ESTestCase {
public void testCreateChunks() {
Client client = mock(Client.class);
Executor ccrExecutor = mock(Executor.class);
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, 1024, leaderShardId, followShardId, e -> {});
ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, 1024, 1, leaderShardId, followShardId, e -> {});
coordinator.createChucks(0, 1024);
List<long[]> result = new ArrayList<>(coordinator.getChunks());
assertThat(result.size(), equalTo(1));
@ -70,4 +92,235 @@ public class ChunksCoordinatorTests extends ESTestCase {
assertThat(result.get(4)[1], equalTo(8196L));
}
public void testCoordinator() throws Exception {
Client client = mock(Client.class);
mockShardChangesApiCall(client);
mockBulkShardOperationsApiCall(client);
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
Consumer<Exception> handler = e -> assertThat(e, nullValue());
int concurrentProcessors = randomIntBetween(1, 4);
int batchSize = randomIntBetween(1, 1000);
ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, batchSize, concurrentProcessors,
leaderShardId, followShardId, handler);
int numberOfOps = randomIntBetween(batchSize, batchSize * 20);
long from = randomInt(1000);
long to = from + numberOfOps;
coordinator.createChucks(from, to);
int expectedNumberOfChunks = numberOfOps / batchSize;
if (numberOfOps % batchSize > 0) {
expectedNumberOfChunks++;
}
assertThat(coordinator.getChunks().size(), equalTo(expectedNumberOfChunks));
coordinator.start();
assertThat(coordinator.getChunks().size(), equalTo(0));
verify(client, times(expectedNumberOfChunks)).execute(same(ShardChangesAction.INSTANCE),
any(ShardChangesAction.Request.class), any(ActionListener.class));
verify(client, times(expectedNumberOfChunks)).execute(same(BulkShardOperationsAction.INSTANCE),
any(BulkShardOperationsRequest.class), any(ActionListener.class));
}
public void testCoordinator_failure() throws Exception {
Exception expectedException = new RuntimeException("throw me");
Client client = mock(Client.class);
boolean shardChangesActionApiCallFailed;
if (randomBoolean()) {
shardChangesActionApiCallFailed = true;
doThrow(expectedException).when(client).execute(same(ShardChangesAction.INSTANCE),
any(ShardChangesAction.Request.class), any(ActionListener.class));
} else {
shardChangesActionApiCallFailed = false;
mockShardChangesApiCall(client);
doThrow(expectedException).when(client).execute(same(BulkShardOperationsAction.INSTANCE),
any(BulkShardOperationsRequest.class), any(ActionListener.class));
}
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
Consumer<Exception> handler = e -> {
assertThat(e, notNullValue());
assertThat(e, sameInstance(expectedException));
};
ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, 10, 1, leaderShardId, followShardId, handler);
coordinator.createChucks(0, 20);
assertThat(coordinator.getChunks().size(), equalTo(2));
coordinator.start();
assertThat(coordinator.getChunks().size(), equalTo(1));
verify(client, times(1)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class),
any(ActionListener.class));
verify(client, times(shardChangesActionApiCallFailed ? 0 : 1)).execute(same(BulkShardOperationsAction.INSTANCE),
any(BulkShardOperationsRequest.class), any(ActionListener.class));
}
public void testCoordinator_concurrent() throws Exception {
Client client = mock(Client.class);
mockShardChangesApiCall(client);
mockBulkShardOperationsApiCall(client);
Executor ccrExecutor = command -> new Thread(command).start();
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
AtomicBoolean calledOnceChecker = new AtomicBoolean(false);
AtomicReference<Exception> failureHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
Consumer<Exception> handler = e -> {
if (failureHolder.compareAndSet(null, e) == false) {
// This handler should only be called once, irregardless of the number of concurrent processors
calledOnceChecker.set(true);
}
latch.countDown();
};
ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, 1000, 4, leaderShardId, followShardId, handler);
coordinator.createChucks(0, 1000000);
assertThat(coordinator.getChunks().size(), equalTo(1000));
coordinator.start();
latch.await();
assertThat(coordinator.getChunks().size(), equalTo(0));
verify(client, times(1000)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class),
any(ActionListener.class));
verify(client, times(1000)).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class),
any(ActionListener.class));
assertThat(calledOnceChecker.get(), is(false));
}
public void testChunkProcessor() {
Client client = mock(Client.class);
mockShardChangesApiCall(client);
mockBulkShardOperationsApiCall(client);
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(0, 10);
assertThat(invoked[0], is(true));
assertThat(exception[0], nullValue());
}
public void testChunkProcessorRetry() {
Client client = mock(Client.class);
mockBulkShardOperationsApiCall(client);
int testRetryLimit = randomIntBetween(1, ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT);
mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception"));
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(0, 10);
assertThat(invoked[0], is(true));
assertThat(exception[0], nullValue());
assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit + 1));
}
public void testChunkProcessorRetryTooManyTimes() {
Client client = mock(Client.class);
mockBulkShardOperationsApiCall(client);
int testRetryLimit = ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT + 1;
mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception"));
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(0, 10);
assertThat(invoked[0], is(true));
assertThat(exception[0], notNullValue());
assertThat(exception[0].getMessage(), equalTo("retrying failed [16] times, aborting..."));
assertThat(exception[0].getCause().getMessage(), equalTo("connection exception"));
assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit));
}
public void testChunkProcessorNoneRetryableError() {
Client client = mock(Client.class);
mockBulkShardOperationsApiCall(client);
mockShardCangesApiCallWithRetry(client, 3, new RuntimeException("unexpected"));
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(0, 10);
assertThat(invoked[0], is(true));
assertThat(exception[0], notNullValue());
assertThat(exception[0].getMessage(), equalTo("unexpected"));
assertThat(chunkProcessor.retryCounter.get(), equalTo(0));
}
private void mockShardCangesApiCallWithRetry(Client client, int testRetryLimit, Exception e) {
int[] retryCounter = new int[1];
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 3;
ShardChangesAction.Request request = (ShardChangesAction.Request) args[1];
@SuppressWarnings("unchecked")
ActionListener<ShardChangesAction.Response> listener = (ActionListener) args[2];
if (retryCounter[0]++ <= testRetryLimit) {
listener.onFailure(e);
} else {
long delta = request.getMaxSeqNo() - request.getMinSeqNo();
Translog.Operation[] operations = new Translog.Operation[(int) delta];
for (int i = 0; i < operations.length; i++) {
operations[i] = new Translog.NoOp(request.getMinSeqNo() + i, 1, "test");
}
ShardChangesAction.Response response = new ShardChangesAction.Response(operations);
listener.onResponse(response);
}
return null;
}).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any(ActionListener.class));
}
private void mockShardChangesApiCall(Client client) {
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 3;
ShardChangesAction.Request request = (ShardChangesAction.Request) args[1];
@SuppressWarnings("unchecked")
ActionListener<ShardChangesAction.Response> listener = (ActionListener) args[2];
long delta = request.getMaxSeqNo() - request.getMinSeqNo();
Translog.Operation[] operations = new Translog.Operation[(int) delta];
for (int i = 0; i < operations.length; i++) {
operations[i] = new Translog.NoOp(request.getMinSeqNo() + i, 1, "test");
}
ShardChangesAction.Response response = new ShardChangesAction.Response(operations);
listener.onResponse(response);
return null;
}).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any(ActionListener.class));
}
private void mockBulkShardOperationsApiCall(Client client) {
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 3;
@SuppressWarnings("unchecked")
ActionListener<BulkShardOperationsResponse> listener = (ActionListener) args[2];
listener.onResponse(new BulkShardOperationsResponse());
return null;
}).when(client).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class),
any(ActionListener.class));
}
}

View File

@ -24,7 +24,7 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase<ShardFollo
return new ShardFollowTask(
new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)),
new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)),
randomIntBetween(1, Integer.MAX_VALUE)
randomIntBetween(1, Integer.MAX_VALUE), randomIntBetween(1, Integer.MAX_VALUE)
);
}