[CCR] Add more unit tests for shard follow task (#32121)

The added tests are based on specific scenarios as described in the test plan.
Before this change the ShardFollowNodeTaskTests contained more random like tests,
but these have been removed and in a followup pr better random tests will
be added in a new test class as is described in the test plan.
This commit is contained in:
Martijn van Groningen 2018-07-20 14:12:05 +02:00 committed by GitHub
parent d0f3ed5abd
commit a6b7497fdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 710 additions and 207 deletions

View File

@ -277,7 +277,8 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
return EMPTY_OPERATIONS_ARRAY;
}
int seenBytes = 0;
long toSeqNo = Math.min(globalCheckpoint, fromSeqNo + maxOperationCount);
// - 1 is needed, because toSeqNo is inclusive
long toSeqNo = Math.min(globalCheckpoint, (fromSeqNo + maxOperationCount) - 1);
final List<Translog.Operation> operations = new ArrayList<>();
try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", fromSeqNo, toSeqNo, true)) {
Translog.Operation op;

View File

@ -67,7 +67,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private volatile int numConcurrentWrites = 0;
private volatile long followerGlobalCheckpoint = 0;
private volatile long currentIndexMetadataVersion = 0;
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo).reversed());
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler) {
@ -78,10 +78,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
}
void start(long followerGlobalCheckpoint) {
void start(long leaderGlobalCheckpoint, long followerGlobalCheckpoint) {
this.lastRequestedSeqno = followerGlobalCheckpoint;
this.followerGlobalCheckpoint = followerGlobalCheckpoint;
this.leaderGlobalCheckpoint = followerGlobalCheckpoint;
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
// Forcefully updates follower mapping, this gets us the leader imd version and
// makes sure that leader and follower mapping are identical.
@ -93,7 +93,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
});
}
private synchronized void coordinateReads() {
synchronized void coordinateReads() {
if (isStopped()) {
LOGGER.info("{} shard follow task has been stopped", params.getFollowShardId());
return;
@ -105,7 +105,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
while (hasReadBudget() && lastRequestedSeqno < leaderGlobalCheckpoint) {
numConcurrentReads++;
long from = lastRequestedSeqno + 1;
long maxRequiredSeqno = Math.min(leaderGlobalCheckpoint, from + maxBatchOperationCount);
// -1 is needed, because maxRequiredSeqno is inclusive
long maxRequiredSeqno = Math.min(leaderGlobalCheckpoint, (from + maxBatchOperationCount) - 1);
LOGGER.trace("{}[{}] read [{}/{}]", params.getFollowShardId(), numConcurrentReads, maxRequiredSeqno, maxBatchOperationCount);
sendShardChangesRequest(from, maxBatchOperationCount, maxRequiredSeqno);
lastRequestedSeqno = maxRequiredSeqno;
@ -137,6 +138,11 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
}
private synchronized void coordinateWrites() {
if (isStopped()) {
LOGGER.info("{} shard follow task has been stopped", params.getFollowShardId());
return;
}
while (hasWriteBudget() && buffer.isEmpty() == false) {
long sumEstimatedSize = 0L;
int length = Math.min(params.getMaxBatchOperationCount(), buffer.size());
@ -176,9 +182,11 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
e -> handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter)));
}
private void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
maybeUpdateMapping(response.getIndexMetadataVersion(), () -> {
synchronized (ShardFollowNodeTask.this) {
void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
maybeUpdateMapping(response.getIndexMetadataVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response));
}
synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint());
final long newMinRequiredSeqNo;
if (response.getOperations().length == 0) {
@ -198,7 +206,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
coordinateWrites();
}
if (newMinRequiredSeqNo < maxRequiredSeqNo) {
if (newMinRequiredSeqNo < maxRequiredSeqNo && isStopped() == false) {
int newSize = (int) (maxRequiredSeqNo - newMinRequiredSeqNo) + 1;
LOGGER.trace("{} received [{}] ops, still missing [{}/{}], continuing to read...",
params.getFollowShardId(), response.getOperations().length, newMinRequiredSeqNo, maxRequiredSeqNo);
@ -217,8 +225,6 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
}
}
}
});
}
private void sendBulkShardOperationsRequest(List<Translog.Operation> operations) {
sendBulkShardOperationsRequest(operations, new AtomicInteger(0));
@ -306,7 +312,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
@Override
public Status getStatus() {
return new Status(leaderGlobalCheckpoint, lastRequestedSeqno, followerGlobalCheckpoint, numConcurrentReads, numConcurrentWrites);
return new Status(leaderGlobalCheckpoint, lastRequestedSeqno, followerGlobalCheckpoint, numConcurrentReads, numConcurrentWrites,
currentIndexMetadataVersion);
}
public static class Status implements Task.Status {
@ -318,9 +325,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
static final ParseField LAST_REQUESTED_SEQNO_FIELD = new ParseField("last_requested_seqno");
static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads");
static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField("number_of_concurrent_writes");
static final ParseField INDEX_METADATA_VERSION_FIELD = new ParseField("index_metadata_version");
static final ConstructingObjectParser<Status, Void> PARSER = new ConstructingObjectParser<>(NAME,
args -> new Status((long) args[0], (long) args[1], (long) args[2], (int) args[3], (int) args[4]));
args -> new Status((long) args[0], (long) args[1], (long) args[2], (int) args[3], (int) args[4], (long) args[5]));
static {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD);
@ -328,6 +336,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD);
}
private final long leaderGlobalCheckpoint;
@ -335,14 +344,16 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private final long followerGlobalCheckpoint;
private final int numberOfConcurrentReads;
private final int numberOfConcurrentWrites;
private final long indexMetadataVersion;
Status(long leaderGlobalCheckpoint, long lastRequestedSeqno, long followerGlobalCheckpoint,
int numberOfConcurrentReads, int numberOfConcurrentWrites) {
int numberOfConcurrentReads, int numberOfConcurrentWrites, long indexMetadataVersion) {
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
this.lastRequestedSeqno = lastRequestedSeqno;
this.followerGlobalCheckpoint = followerGlobalCheckpoint;
this.numberOfConcurrentReads = numberOfConcurrentReads;
this.numberOfConcurrentWrites = numberOfConcurrentWrites;
this.indexMetadataVersion = indexMetadataVersion;
}
public Status(StreamInput in) throws IOException {
@ -351,6 +362,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.followerGlobalCheckpoint = in.readZLong();
this.numberOfConcurrentReads = in.readVInt();
this.numberOfConcurrentWrites = in.readVInt();
this.indexMetadataVersion = in.readVLong();
}
public long getLeaderGlobalCheckpoint() {
@ -373,6 +385,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
return numberOfConcurrentWrites;
}
public long getIndexMetadataVersion() {
return indexMetadataVersion;
}
@Override
public String getWriteableName() {
return NAME;
@ -385,6 +401,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
out.writeZLong(followerGlobalCheckpoint);
out.writeVInt(numberOfConcurrentReads);
out.writeVInt(numberOfConcurrentWrites);
out.writeVLong(indexMetadataVersion);
}
@Override
@ -396,6 +413,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
builder.field(LAST_REQUESTED_SEQNO_FIELD.getPreferredName(), lastRequestedSeqno);
builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads);
builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites);
builder.field(INDEX_METADATA_VERSION_FIELD.getPreferredName(), indexMetadataVersion);
}
builder.endObject();
return builder;
@ -414,13 +432,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
lastRequestedSeqno == status.lastRequestedSeqno &&
followerGlobalCheckpoint == status.followerGlobalCheckpoint &&
numberOfConcurrentReads == status.numberOfConcurrentReads &&
numberOfConcurrentWrites == status.numberOfConcurrentWrites;
numberOfConcurrentWrites == status.numberOfConcurrentWrites &&
indexMetadataVersion == status.indexMetadataVersion;
}
@Override
public int hashCode() {
return Objects.hash(leaderGlobalCheckpoint, lastRequestedSeqno, followerGlobalCheckpoint, numberOfConcurrentReads,
numberOfConcurrentWrites);
numberOfConcurrentWrites, indexMetadataVersion);
}
public String toString() {

View File

@ -142,7 +142,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
Client followerClient = wrapClient(client, params);
ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
logger.info("{} Started to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId());
fetchGlobalCheckpoint(followerClient, params.getFollowShardId(), shardFollowNodeTask::start, task::markAsFailed);
fetchGlobalCheckpoint(followerClient, params.getFollowShardId(),
followerGCP -> shardFollowNodeTask.start(followerGCP, followerGCP), task::markAsFailed);
}
private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler) {

View File

@ -48,7 +48,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
final Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard,
indexShard.getGlobalCheckpoint(), min, size, Long.MAX_VALUE);
final List<Long> seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toList());
final List<Long> expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toList());
final List<Long> expectedSeqNos = LongStream.range(min, max).boxed().collect(Collectors.toList());
assertThat(seenSeqNos, equalTo(expectedSeqNos));
}

View File

@ -21,7 +21,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
@Override
protected ShardFollowNodeTask.Status createTestInstance() {
return new ShardFollowNodeTask.Status(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE));
randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), randomNonNegativeLong());
}
@Override

View File

@ -5,205 +5,683 @@
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
public class ShardFollowNodeTaskTests extends ESTestCase {
private ShardFollowNodeTask task;
private Exception fatalError;
private List<long[]> shardChangesRequests;
private List<List<Translog.Operation>> bulkShardOperationRequests;
private BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> task.run();
private AtomicLong leaderGlobalCheckPoint;
private AtomicLong imdVersion;
private AtomicInteger mappingUpdateCounter;
private Queue<Exception> readFailures;
private Queue<Exception> writeFailures;
private Queue<Exception> mappingUpdateFailures;
private Queue<Long> imdVersions;
private Queue<Long> followerGlobalCheckpoints;
private AtomicInteger truncatedRequests;
private AtomicBoolean randomlyTruncateRequests;
public void testCoordinateReads() {
ShardFollowNodeTask task = createShardFollowTask(8, 8, 8, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
private AtomicInteger failedRequests;
private AtomicBoolean randomlyFailWithRetryableError;
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(8));
assertThat(shardChangesRequests, contains(new long[][]{
{0L, 8L}, {8L, 8L}, {16L, 8L}, {24L, 8L}, {32L, 8L}, {40L, 8L}, {48L, 8L}, {56L, 8L}}
));
private AtomicReference<Exception> failureHolder = new AtomicReference<>();
public void testDefaults() throws Exception {
long followGlobalCheckpoint = randomIntBetween(-1, 2048);
task = createShardFollowTask(ShardFollowNodeTask.DEFAULT_MAX_BATCH_OPERATION_COUNT,
ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READ_BATCHES, ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES,
10000, ShardFollowNodeTask.DEFAULT_MAX_WRITE_BUFFER_SIZE, followGlobalCheckpoint);
task.start(followGlobalCheckpoint);
assertBusy(() -> {
assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L));
});
assertThat(mappingUpdateCounter.get(), equalTo(1));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(8));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
}
public void testHitBufferLimit() throws Exception {
// Setting buffer limit to 100, so that we are sure the limit will be met
task = createShardFollowTask(ShardFollowNodeTask.DEFAULT_MAX_BATCH_OPERATION_COUNT, 3, 1, 10000, 100, -1);
task.start(-1);
public void testWriteBuffer() {
// Need to set concurrentWrites to 0, other the write buffer gets flushed immediately:
ShardFollowNodeTask task = createShardFollowTask(64, 1, 0, 32, Long.MAX_VALUE);
startTask(task, 64, -1);
assertBusy(() -> {
assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L));
});
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
shardChangesRequests.clear();
// Also invokes the coordinatesReads() method:
task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L));
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer is full
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(0));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(128L));
}
public void testConcurrentReadsAndWrites() throws Exception {
long followGlobalCheckpoint = randomIntBetween(-1, 2048);
task = createShardFollowTask(randomIntBetween(32, 2048), randomIntBetween(2, 10),
randomIntBetween(2, 10), 50000, 10240, followGlobalCheckpoint);
task.start(followGlobalCheckpoint);
public void testMaxConcurrentReads() {
ShardFollowNodeTask task = createShardFollowTask(8, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
assertBusy(() -> {
assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(50000L));
});
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(8L));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(7L));
}
public void testMappingUpdate() throws Exception {
task = createShardFollowTask(1024, 1, 1, 1000, 1024, -1);
task.start(-1);
public void testTaskCancelled() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
assertBusy(() -> {
assertThat(task.getStatus().getFollowerGlobalCheckpoint(), greaterThanOrEqualTo(1000L));
});
imdVersion.set(2L);
leaderGlobalCheckPoint.set(10000L);
assertBusy(() -> {
assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L));
});
assertThat(mappingUpdateCounter.get(), equalTo(2));
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
shardChangesRequests.clear();
// The call the updateMapping is a noop, so noting happens.
task.start(128L, task.getStatus().getFollowerGlobalCheckpoint());
task.markAsCompleted();
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(0));
}
public void testOccasionalApiFailure() throws Exception {
task = createShardFollowTask(1024, 1, 1, 10000, 1024, -1);
task.start(-1);
randomlyFailWithRetryableError.set(true);
assertBusy(() -> {
assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L));
});
assertThat(failedRequests.get(), greaterThan(0));
public void testTaskCancelledAfterReadLimitHasBeenReached() {
ShardFollowNodeTask task = createShardFollowTask(16, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 31, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(16L));
task.markAsCompleted();
shardChangesRequests.clear();
// Also invokes the coordinatesReads() method:
task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 31L));
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled
assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(0));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(15L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(31L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}
public void testNotAllExpectedOpsReturned() throws Exception {
task = createShardFollowTask(1024, 1, 1, 10000, 1024, -1);
task.start(-1);
randomlyTruncateRequests.set(true);
assertBusy(() -> {
assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L));
});
assertThat(truncatedRequests.get(), greaterThan(0));
public void testTaskCancelledAfterWriteBufferLimitHasBeenReached() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, 32, Long.MAX_VALUE);
startTask(task, 64, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
task.markAsCompleted();
shardChangesRequests.clear();
// Also invokes the coordinatesReads() method:
task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L));
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled
assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(0));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(128L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}
ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBathces,
int globalCheckpoint, int bufferWriteLimit, long followGlobalCheckpoint) {
leaderGlobalCheckPoint = new AtomicLong(globalCheckpoint);
imdVersion = new AtomicLong(1L);
mappingUpdateCounter = new AtomicInteger(0);
randomlyTruncateRequests = new AtomicBoolean(false);
truncatedRequests = new AtomicInteger();
randomlyFailWithRetryableError = new AtomicBoolean(false);
failedRequests = new AtomicInteger(0);
public void testReceiveRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
int max = randomIntBetween(1, 10);
for (int i = 0; i < max; i++) {
readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
}
task.coordinateReads();
// NUmber of requests is equal to initial request + retried attempts
assertThat(shardChangesRequests.size(), equalTo(max + 1));
for (long[] shardChangesRequest : shardChangesRequests) {
assertThat(shardChangesRequest[0], equalTo(0L));
assertThat(shardChangesRequest[1], equalTo(64L));
}
assertThat(task.isStopped(), equalTo(false));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
}
public void testReceiveRetryableErrorRetriedTooManyTimes() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
int max = randomIntBetween(11, 32);
for (int i = 0; i < max; i++) {
readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
}
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(11));
for (long[] shardChangesRequest : shardChangesRequests) {
assertThat(shardChangesRequest[0], equalTo(0L));
assertThat(shardChangesRequest[1], equalTo(64L));
}
assertThat(task.isStopped(), equalTo(true));
assertThat(fatalError, notNullValue());
assertThat(fatalError.getMessage(), containsString("retrying failed ["));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
}
public void testReceiveNonRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
Exception failure = new RuntimeException();
readFailures.add(failure);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
assertThat(task.isStopped(), equalTo(true));
assertThat(fatalError, sameInstance(failure));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
}
public void testHandleReadResponse() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
task.coordinateReads();
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
task.innerHandleReadResponse(0L, 64L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getIndexMetadataVersion(), equalTo(0L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}
public void testReceiveLessThanRequested() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
shardChangesRequests.clear();
ShardChangesAction.Response response = generateShardChangesResponse(0, 32, 0L, 31L);
task.innerHandleReadResponse(0L, 64L, response);
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(32L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
}
public void testCancelAndReceiveLessThanRequested() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
shardChangesRequests.clear();
task.markAsCompleted();
ShardChangesAction.Response response = generateShardChangesResponse(0, 32, 0L, 31L);
task.innerHandleReadResponse(0L, 64L, response);
assertThat(shardChangesRequests.size(), equalTo(0));
assertThat(bulkShardOperationRequests.size(), equalTo(0));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(0));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
}
public void testReceiveNothingExpectedSomething() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
shardChangesRequests.clear();
ShardChangesAction.Response response = generateShardChangesResponse(0, 0, 0L, 0L);
task.innerHandleReadResponse(0L, 64L, response);
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
}
public void testDelayCoordinatesRead() {
int[] counter = new int[]{0};
scheduler = (delay, task) -> {
counter[0]++;
task.run();
};
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
shardChangesRequests.clear();
ShardChangesAction.Response response = generateShardChangesResponse(0, 65, 0L, 64L);
// Also invokes coordinateReads()
task.innerHandleReadResponse(0L, 64L, response);
response = generateShardChangesResponse(0, 0, 0L, 64L);
task.innerHandleReadResponse(65L, 64L, response);
assertThat(counter[0], equalTo(1));
}
public void testMappingUpdate() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
imdVersions.add(1L);
task.coordinateReads();
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L);
task.handleReadResponse(0L, 64L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getIndexMetadataVersion(), equalTo(1L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}
public void testMappingUpdateRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
int max = randomIntBetween(1, 10);
for (int i = 0; i < max; i++) {
mappingUpdateFailures.add(new ConnectException());
}
imdVersions.add(1L);
task.coordinateReads();
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L);
task.handleReadResponse(0L, 64L, response);
assertThat(mappingUpdateFailures.size(), equalTo(0));
assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(task.isStopped(), equalTo(false));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getIndexMetadataVersion(), equalTo(1L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
}
public void testMappingUpdateRetryableErrorRetriedTooManyTimes() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
int max = randomIntBetween(11, 20);
for (int i = 0; i < max; i++) {
mappingUpdateFailures.add(new ConnectException());
}
imdVersions.add(1L);
task.coordinateReads();
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L);
task.handleReadResponse(0L, 64L, response);
assertThat(mappingUpdateFailures.size(), equalTo(max - 11));
assertThat(imdVersions.size(), equalTo(1));
assertThat(bulkShardOperationRequests.size(), equalTo(0));
assertThat(task.isStopped(), equalTo(true));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getIndexMetadataVersion(), equalTo(0L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
}
public void testMappingUpdateNonRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
mappingUpdateFailures.add(new RuntimeException());
task.coordinateReads();
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L);
task.handleReadResponse(0L, 64L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(0));
assertThat(task.isStopped(), equalTo(true));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getIndexMetadataVersion(), equalTo(0L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
}
public void testCoordinateWrites() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}
public void testMaxConcurrentWrites() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 2, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(2));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()).subList(0, 64)));
assertThat(bulkShardOperationRequests.get(1), equalTo(Arrays.asList(response.getOperations()).subList(64, 128)));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentWrites(), equalTo(2));
task = createShardFollowTask(64, 1, 4, Integer.MAX_VALUE, Long.MAX_VALUE);
response = generateShardChangesResponse(0, 256, 0L, 256L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(4));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()).subList(0, 64)));
assertThat(bulkShardOperationRequests.get(1), equalTo(Arrays.asList(response.getOperations()).subList(64, 128)));
assertThat(bulkShardOperationRequests.get(2), equalTo(Arrays.asList(response.getOperations()).subList(128, 192)));
assertThat(bulkShardOperationRequests.get(3), equalTo(Arrays.asList(response.getOperations()).subList(192, 256)));
status = task.getStatus();
assertThat(status.getNumberOfConcurrentWrites(), equalTo(4));
}
public void testMaxBatchOperationCount() {
ShardFollowNodeTask task = createShardFollowTask(8, 1, 32, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(32));
for (int i = 0; i < 32; i += 8) {
int offset = i * 8;
assertThat(bulkShardOperationRequests.get(i), equalTo(Arrays.asList(response.getOperations()).subList(offset, offset + 8)));
}
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentWrites(), equalTo(32));
}
public void testRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
int max = randomIntBetween(1, 10);
for (int i = 0; i < max; i++) {
writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
}
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
// Number of requests is equal to initial request + retried attempts:
assertThat(bulkShardOperationRequests.size(), equalTo(max + 1));
for (List<Translog.Operation> operations : bulkShardOperationRequests) {
assertThat(operations, equalTo(Arrays.asList(response.getOperations())));
}
assertThat(task.isStopped(), equalTo(false));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}
public void testRetryableErrorRetriedTooManyTimes() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
int max = randomIntBetween(11, 32);
for (int i = 0; i < max; i++) {
writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
}
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
// Number of requests is equal to initial request + retried attempts:
assertThat(bulkShardOperationRequests.size(), equalTo(11));
for (List<Translog.Operation> operations : bulkShardOperationRequests) {
assertThat(operations, equalTo(Arrays.asList(response.getOperations())));
}
assertThat(task.isStopped(), equalTo(true));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}
public void testNonRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
writeFailures.add(new RuntimeException());
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
assertThat(task.isStopped(), equalTo(true));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}
public void testMaxBatchBytesLimit() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 128, Integer.MAX_VALUE, 1L);
startTask(task, 64, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(64));
}
public void testHandleWriteResponse() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
shardChangesRequests.clear();
followerGlobalCheckpoints.add(63L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 63L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 63L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
// handleWrite() also delegates to coordinateReads
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(64L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(63L));
}
ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBatches,
int bufferWriteLimit, long maxBatchSizeInBytes) {
AtomicBoolean stopped = new AtomicBoolean(false);
ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0),
new ShardId("leader_index", "", 0), maxBatchOperationCount, maxConcurrentReadBatches,
ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, maxConcurrentWriteBathces, bufferWriteLimit,
TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10), Collections.emptyMap());
new ShardId("leader_index", "", 0), maxBatchOperationCount, maxConcurrentReadBatches, maxBatchSizeInBytes,
maxConcurrentWriteBatches, bufferWriteLimit, TimeValue.ZERO, TimeValue.ZERO, Collections.emptyMap());
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> {
try {
Thread.sleep(delay.millis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Thread thread = new Thread(task);
thread.start();
};
AtomicInteger readCounter = new AtomicInteger();
AtomicInteger writeCounter = new AtomicInteger();
LocalCheckpointTracker tracker = new LocalCheckpointTracker(followGlobalCheckpoint, followGlobalCheckpoint);
shardChangesRequests = new ArrayList<>();
bulkShardOperationRequests = new ArrayList<>();
readFailures = new LinkedList<>();
writeFailures = new LinkedList<>();
mappingUpdateFailures = new LinkedList<>();
imdVersions = new LinkedList<>();
followerGlobalCheckpoints = new LinkedList<>();
return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) {
@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
mappingUpdateCounter.incrementAndGet();
handler.accept(imdVersion.get());
Exception failure = mappingUpdateFailures.poll();
if (failure != null) {
errorHandler.accept(failure);
return;
}
Long imdVersion = imdVersions.poll();
if (imdVersion != null) {
handler.accept(imdVersion);
}
}
@Override
protected void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, LongConsumer handler,
Consumer<Exception> errorHandler) {
if (randomlyFailWithRetryableError.get() && readCounter.incrementAndGet() % 5 == 0) {
failedRequests.incrementAndGet();
errorHandler.accept(new UnavailableShardsException(params.getFollowShardId(), "test error"));
bulkShardOperationRequests.add(operations);
Exception writeFailure = ShardFollowNodeTaskTests.this.writeFailures.poll();
if (writeFailure != null) {
errorHandler.accept(writeFailure);
return;
}
for(Translog.Operation op : operations) {
tracker.markSeqNoAsCompleted(op.seqNo());
Long followerGlobalCheckpoint = followerGlobalCheckpoints.poll();
if (followerGlobalCheckpoint != null) {
handler.accept(followerGlobalCheckpoint);
}
// Emulate network thread and avoid SO:
Thread thread = new Thread(() -> handler.accept(tracker.getCheckpoint()));
thread.start();
}
@Override
protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
Consumer<Exception> errorHandler) {
if (randomlyFailWithRetryableError.get() && writeCounter.incrementAndGet() % 5 == 0) {
failedRequests.incrementAndGet();
errorHandler.accept(new UnavailableShardsException(params.getFollowShardId(), "test error"));
return;
shardChangesRequests.add(new long[]{from, maxBatchOperationCount});
Exception readFailure = ShardFollowNodeTaskTests.this.readFailures.poll();
if (readFailure != null) {
errorHandler.accept(readFailure);
}
if (from < 0) {
errorHandler.accept(new IllegalArgumentException());
return;
}
ShardChangesAction.Response response;
if (from > leaderGlobalCheckPoint.get()) {
response = new ShardChangesAction.Response(imdVersion.get(), leaderGlobalCheckPoint.get(), new Translog.Operation[0]);
} else {
if (randomlyTruncateRequests.get() && maxOperationCount > 10 && truncatedRequests.get() < 5) {
truncatedRequests.incrementAndGet();
maxOperationCount = maxOperationCount / 2;
}
List<Translog.Operation> ops = new ArrayList<>();
long maxSeqNo = Math.min(from + maxOperationCount, leaderGlobalCheckPoint.get());
for (long seqNo = from; seqNo <= maxSeqNo; seqNo++) {
String id = UUIDs.randomBase64UUID();
byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
ops.add(new Translog.Index("doc", id, seqNo, 0, source));
}
response = new ShardChangesAction.Response(imdVersion.get(), leaderGlobalCheckPoint.get(),
ops.toArray(new Translog.Operation[0]));
}
// Emulate network thread and avoid SO:
Thread thread = new Thread(() -> handler.accept(response));
thread.start();
}
@Override
@ -218,23 +696,27 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
@Override
public void markAsFailed(Exception e) {
fatalError = e;
stopped.set(true);
failureHolder.set(e);
}
};
}
@After
public void cancelNodeTask() throws Exception {
if (task != null){
task.markAsCompleted();
assertThat(failureHolder.get(), nullValue());
assertBusy(() -> {
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(0));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
});
private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, int size, long imdVersion,
long leaderGlobalCheckPoint) {
List<Translog.Operation> ops = new ArrayList<>();
for (long seqNo = fromSeqNo; seqNo < size; seqNo++) {
String id = UUIDs.randomBase64UUID();
byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
ops.add(new Translog.Index("doc", id, seqNo, 0, source));
}
return new ShardChangesAction.Response(imdVersion, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0]));
}
void startTask(ShardFollowNodeTask task, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) {
// The call the updateMapping is a noop, so noting happens.
task.start(leaderGlobalCheckpoint, followerGlobalCheckpoint);
}
}

View File

@ -45,7 +45,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
leaderGroup.assertAllEqual(docCount);
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
shardFollowTask.start(followerGroup.getPrimary().getGlobalCheckpoint());
shardFollowTask.start(leaderGroup.getPrimary().getGlobalCheckpoint(), followerGroup.getPrimary().getGlobalCheckpoint());
docCount += leaderGroup.appendDocs(randomInt(128));
leaderGroup.syncGlobalCheckpoint();
@ -62,7 +62,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
leaderGroup.startAll();
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
shardFollowTask.start(followerGroup.getPrimary().getGlobalCheckpoint());
shardFollowTask.start(leaderGroup.getPrimary().getGlobalCheckpoint(), followerGroup.getPrimary().getGlobalCheckpoint());
int docCount = 256;
leaderGroup.appendDocs(1);
Runnable task = () -> {