diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index dd67f09bd72..23954bb0c80 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -277,7 +277,8 @@ public class ShardChangesAction extends Action { return EMPTY_OPERATIONS_ARRAY; } int seenBytes = 0; - long toSeqNo = Math.min(globalCheckpoint, fromSeqNo + maxOperationCount); + // - 1 is needed, because toSeqNo is inclusive + long toSeqNo = Math.min(globalCheckpoint, (fromSeqNo + maxOperationCount) - 1); final List operations = new ArrayList<>(); try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", fromSeqNo, toSeqNo, true)) { Translog.Operation op; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 97c1c9fcd21..492a7e492ff 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -67,7 +67,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private volatile int numConcurrentWrites = 0; private volatile long followerGlobalCheckpoint = 0; private volatile long currentIndexMetadataVersion = 0; - private final Queue buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo).reversed()); + private final Queue buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo)); ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers, ShardFollowTask params, BiConsumer scheduler) { @@ -78,10 +78,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay(); } - void start(long followerGlobalCheckpoint) { + void start(long leaderGlobalCheckpoint, long followerGlobalCheckpoint) { this.lastRequestedSeqno = followerGlobalCheckpoint; this.followerGlobalCheckpoint = followerGlobalCheckpoint; - this.leaderGlobalCheckpoint = followerGlobalCheckpoint; + this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; // Forcefully updates follower mapping, this gets us the leader imd version and // makes sure that leader and follower mapping are identical. @@ -93,7 +93,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { }); } - private synchronized void coordinateReads() { + synchronized void coordinateReads() { if (isStopped()) { LOGGER.info("{} shard follow task has been stopped", params.getFollowShardId()); return; @@ -105,7 +105,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { while (hasReadBudget() && lastRequestedSeqno < leaderGlobalCheckpoint) { numConcurrentReads++; long from = lastRequestedSeqno + 1; - long maxRequiredSeqno = Math.min(leaderGlobalCheckpoint, from + maxBatchOperationCount); + // -1 is needed, because maxRequiredSeqno is inclusive + long maxRequiredSeqno = Math.min(leaderGlobalCheckpoint, (from + maxBatchOperationCount) - 1); LOGGER.trace("{}[{}] read [{}/{}]", params.getFollowShardId(), numConcurrentReads, maxRequiredSeqno, maxBatchOperationCount); sendShardChangesRequest(from, maxBatchOperationCount, maxRequiredSeqno); lastRequestedSeqno = maxRequiredSeqno; @@ -137,6 +138,11 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { } private synchronized void coordinateWrites() { + if (isStopped()) { + LOGGER.info("{} shard follow task has been stopped", params.getFollowShardId()); + return; + } + while (hasWriteBudget() && buffer.isEmpty() == false) { long sumEstimatedSize = 0L; int length = Math.min(params.getMaxBatchOperationCount(), buffer.size()); @@ -176,48 +182,48 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { e -> handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter))); } - private void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { - maybeUpdateMapping(response.getIndexMetadataVersion(), () -> { - synchronized (ShardFollowNodeTask.this) { - leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint()); - final long newMinRequiredSeqNo; - if (response.getOperations().length == 0) { - newMinRequiredSeqNo = from; - } else { - assert response.getOperations()[0].seqNo() == from : - "first operation is not what we asked for. From is [" + from + "], got " + response.getOperations()[0]; - buffer.addAll(Arrays.asList(response.getOperations())); - final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo(); - assert maxSeqNo== - Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong(); - newMinRequiredSeqNo = maxSeqNo + 1; - // update last requested seq no as we may have gotten more than we asked for and we don't want to ask it again. - lastRequestedSeqno = Math.max(lastRequestedSeqno, maxSeqNo); - assert lastRequestedSeqno <= leaderGlobalCheckpoint : "lastRequestedSeqno [" + lastRequestedSeqno + - "] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]"; - coordinateWrites(); - } + void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { + maybeUpdateMapping(response.getIndexMetadataVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response)); + } - if (newMinRequiredSeqNo < maxRequiredSeqNo) { - int newSize = (int) (maxRequiredSeqNo - newMinRequiredSeqNo) + 1; - LOGGER.trace("{} received [{}] ops, still missing [{}/{}], continuing to read...", - params.getFollowShardId(), response.getOperations().length, newMinRequiredSeqNo, maxRequiredSeqNo); - sendShardChangesRequest(newMinRequiredSeqNo, newSize, maxRequiredSeqNo); - } else { - // read is completed, decrement - numConcurrentReads--; - if (response.getOperations().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqno) { - // we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay - // future requests - LOGGER.trace("{} received no ops and no known ops to fetch, scheduling to coordinate reads", - params.getFollowShardId()); - scheduler.accept(idleShardChangesRequestDelay, this::coordinateReads); - } else { - coordinateReads(); - } - } + synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { + leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint()); + final long newMinRequiredSeqNo; + if (response.getOperations().length == 0) { + newMinRequiredSeqNo = from; + } else { + assert response.getOperations()[0].seqNo() == from : + "first operation is not what we asked for. From is [" + from + "], got " + response.getOperations()[0]; + buffer.addAll(Arrays.asList(response.getOperations())); + final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo(); + assert maxSeqNo == + Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong(); + newMinRequiredSeqNo = maxSeqNo + 1; + // update last requested seq no as we may have gotten more than we asked for and we don't want to ask it again. + lastRequestedSeqno = Math.max(lastRequestedSeqno, maxSeqNo); + assert lastRequestedSeqno <= leaderGlobalCheckpoint : "lastRequestedSeqno [" + lastRequestedSeqno + + "] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]"; + coordinateWrites(); + } + + if (newMinRequiredSeqNo < maxRequiredSeqNo && isStopped() == false) { + int newSize = (int) (maxRequiredSeqNo - newMinRequiredSeqNo) + 1; + LOGGER.trace("{} received [{}] ops, still missing [{}/{}], continuing to read...", + params.getFollowShardId(), response.getOperations().length, newMinRequiredSeqNo, maxRequiredSeqNo); + sendShardChangesRequest(newMinRequiredSeqNo, newSize, maxRequiredSeqNo); + } else { + // read is completed, decrement + numConcurrentReads--; + if (response.getOperations().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqno) { + // we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay + // future requests + LOGGER.trace("{} received no ops and no known ops to fetch, scheduling to coordinate reads", + params.getFollowShardId()); + scheduler.accept(idleShardChangesRequestDelay, this::coordinateReads); + } else { + coordinateReads(); } - }); + } } private void sendBulkShardOperationsRequest(List operations) { @@ -306,7 +312,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { @Override public Status getStatus() { - return new Status(leaderGlobalCheckpoint, lastRequestedSeqno, followerGlobalCheckpoint, numConcurrentReads, numConcurrentWrites); + return new Status(leaderGlobalCheckpoint, lastRequestedSeqno, followerGlobalCheckpoint, numConcurrentReads, numConcurrentWrites, + currentIndexMetadataVersion); } public static class Status implements Task.Status { @@ -318,9 +325,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { static final ParseField LAST_REQUESTED_SEQNO_FIELD = new ParseField("last_requested_seqno"); static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads"); static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField("number_of_concurrent_writes"); + static final ParseField INDEX_METADATA_VERSION_FIELD = new ParseField("index_metadata_version"); static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, - args -> new Status((long) args[0], (long) args[1], (long) args[2], (int) args[3], (int) args[4])); + args -> new Status((long) args[0], (long) args[1], (long) args[2], (int) args[3], (int) args[4], (long) args[5])); static { PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); @@ -328,6 +336,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD); PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD); } private final long leaderGlobalCheckpoint; @@ -335,14 +344,16 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private final long followerGlobalCheckpoint; private final int numberOfConcurrentReads; private final int numberOfConcurrentWrites; + private final long indexMetadataVersion; Status(long leaderGlobalCheckpoint, long lastRequestedSeqno, long followerGlobalCheckpoint, - int numberOfConcurrentReads, int numberOfConcurrentWrites) { + int numberOfConcurrentReads, int numberOfConcurrentWrites, long indexMetadataVersion) { this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; this.lastRequestedSeqno = lastRequestedSeqno; this.followerGlobalCheckpoint = followerGlobalCheckpoint; this.numberOfConcurrentReads = numberOfConcurrentReads; this.numberOfConcurrentWrites = numberOfConcurrentWrites; + this.indexMetadataVersion = indexMetadataVersion; } public Status(StreamInput in) throws IOException { @@ -351,6 +362,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { this.followerGlobalCheckpoint = in.readZLong(); this.numberOfConcurrentReads = in.readVInt(); this.numberOfConcurrentWrites = in.readVInt(); + this.indexMetadataVersion = in.readVLong(); } public long getLeaderGlobalCheckpoint() { @@ -373,6 +385,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { return numberOfConcurrentWrites; } + public long getIndexMetadataVersion() { + return indexMetadataVersion; + } + @Override public String getWriteableName() { return NAME; @@ -385,6 +401,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { out.writeZLong(followerGlobalCheckpoint); out.writeVInt(numberOfConcurrentReads); out.writeVInt(numberOfConcurrentWrites); + out.writeVLong(indexMetadataVersion); } @Override @@ -396,6 +413,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { builder.field(LAST_REQUESTED_SEQNO_FIELD.getPreferredName(), lastRequestedSeqno); builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads); builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites); + builder.field(INDEX_METADATA_VERSION_FIELD.getPreferredName(), indexMetadataVersion); } builder.endObject(); return builder; @@ -414,13 +432,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { lastRequestedSeqno == status.lastRequestedSeqno && followerGlobalCheckpoint == status.followerGlobalCheckpoint && numberOfConcurrentReads == status.numberOfConcurrentReads && - numberOfConcurrentWrites == status.numberOfConcurrentWrites; + numberOfConcurrentWrites == status.numberOfConcurrentWrites && + indexMetadataVersion == status.indexMetadataVersion; } @Override public int hashCode() { return Objects.hash(leaderGlobalCheckpoint, lastRequestedSeqno, followerGlobalCheckpoint, numberOfConcurrentReads, - numberOfConcurrentWrites); + numberOfConcurrentWrites, indexMetadataVersion); } public String toString() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index a3ad7aa547f..a6776d1c150 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -142,7 +142,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor shardFollowNodeTask.start(followerGCP, followerGCP), task::markAsFailed); } private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer errorHandler) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java index adbd738725a..5c11c43dcbc 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -48,7 +48,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase { final Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), min, size, Long.MAX_VALUE); final List seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toList()); - final List expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toList()); + final List expectedSeqNos = LongStream.range(min, max).boxed().collect(Collectors.toList()); assertThat(seenSeqNos, equalTo(expectedSeqNos)); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index b550679ebee..7f5223e1cda 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -21,7 +21,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase< @Override protected ShardFollowNodeTask.Status createTestInstance() { return new ShardFollowNodeTask.Status(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE)); + randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), randomNonNegativeLong()); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 351044cadcc..35b00a61571 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -5,205 +5,683 @@ */ package org.elasticsearch.xpack.ccr.action; -import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; -import org.junit.After; +import java.net.ConnectException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; public class ShardFollowNodeTaskTests extends ESTestCase { - private ShardFollowNodeTask task; + private Exception fatalError; + private List shardChangesRequests; + private List> bulkShardOperationRequests; + private BiConsumer scheduler = (delay, task) -> task.run(); - private AtomicLong leaderGlobalCheckPoint; - private AtomicLong imdVersion; - private AtomicInteger mappingUpdateCounter; + private Queue readFailures; + private Queue writeFailures; + private Queue mappingUpdateFailures; + private Queue imdVersions; + private Queue followerGlobalCheckpoints; - private AtomicInteger truncatedRequests; - private AtomicBoolean randomlyTruncateRequests; + public void testCoordinateReads() { + ShardFollowNodeTask task = createShardFollowTask(8, 8, 8, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 64, -1); - private AtomicInteger failedRequests; - private AtomicBoolean randomlyFailWithRetryableError; + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(8)); + assertThat(shardChangesRequests, contains(new long[][]{ + {0L, 8L}, {8L, 8L}, {16L, 8L}, {24L, 8L}, {32L, 8L}, {40L, 8L}, {48L, 8L}, {56L, 8L}} + )); - private AtomicReference failureHolder = new AtomicReference<>(); - - public void testDefaults() throws Exception { - long followGlobalCheckpoint = randomIntBetween(-1, 2048); - task = createShardFollowTask(ShardFollowNodeTask.DEFAULT_MAX_BATCH_OPERATION_COUNT, - ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READ_BATCHES, ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES, - 10000, ShardFollowNodeTask.DEFAULT_MAX_WRITE_BUFFER_SIZE, followGlobalCheckpoint); - task.start(followGlobalCheckpoint); - - assertBusy(() -> { - assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L)); - }); - assertThat(mappingUpdateCounter.get(), equalTo(1)); + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(8)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); } - public void testHitBufferLimit() throws Exception { - // Setting buffer limit to 100, so that we are sure the limit will be met - task = createShardFollowTask(ShardFollowNodeTask.DEFAULT_MAX_BATCH_OPERATION_COUNT, 3, 1, 10000, 100, -1); - task.start(-1); + public void testWriteBuffer() { + // Need to set concurrentWrites to 0, other the write buffer gets flushed immediately: + ShardFollowNodeTask task = createShardFollowTask(64, 1, 0, 32, Long.MAX_VALUE); + startTask(task, 64, -1); - assertBusy(() -> { - assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L)); - }); + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + shardChangesRequests.clear(); + // Also invokes the coordinatesReads() method: + task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L)); + assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer is full + + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(0)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(128L)); } - public void testConcurrentReadsAndWrites() throws Exception { - long followGlobalCheckpoint = randomIntBetween(-1, 2048); - task = createShardFollowTask(randomIntBetween(32, 2048), randomIntBetween(2, 10), - randomIntBetween(2, 10), 50000, 10240, followGlobalCheckpoint); - task.start(followGlobalCheckpoint); + public void testMaxConcurrentReads() { + ShardFollowNodeTask task = createShardFollowTask(8, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 64, -1); - assertBusy(() -> { - assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(50000L)); - }); + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(8L)); + + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getLastRequestedSeqno(), equalTo(7L)); } - public void testMappingUpdate() throws Exception { - task = createShardFollowTask(1024, 1, 1, 1000, 1024, -1); - task.start(-1); + public void testTaskCancelled() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 64, -1); - assertBusy(() -> { - assertThat(task.getStatus().getFollowerGlobalCheckpoint(), greaterThanOrEqualTo(1000L)); - }); - imdVersion.set(2L); - leaderGlobalCheckPoint.set(10000L); - assertBusy(() -> { - assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L)); - }); - assertThat(mappingUpdateCounter.get(), equalTo(2)); + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + shardChangesRequests.clear(); + // The call the updateMapping is a noop, so noting happens. + task.start(128L, task.getStatus().getFollowerGlobalCheckpoint()); + task.markAsCompleted(); + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(0)); } - public void testOccasionalApiFailure() throws Exception { - task = createShardFollowTask(1024, 1, 1, 10000, 1024, -1); - task.start(-1); - randomlyFailWithRetryableError.set(true); - assertBusy(() -> { - assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L)); - }); - assertThat(failedRequests.get(), greaterThan(0)); + public void testTaskCancelledAfterReadLimitHasBeenReached() { + ShardFollowNodeTask task = createShardFollowTask(16, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 31, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(16L)); + + task.markAsCompleted(); + shardChangesRequests.clear(); + // Also invokes the coordinatesReads() method: + task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 31L)); + assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled + assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled + + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(0)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); + assertThat(status.getLastRequestedSeqno(), equalTo(15L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(31L)); + assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L)); } - public void testNotAllExpectedOpsReturned() throws Exception { - task = createShardFollowTask(1024, 1, 1, 10000, 1024, -1); - task.start(-1); - randomlyTruncateRequests.set(true); - assertBusy(() -> { - assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L)); - }); - assertThat(truncatedRequests.get(), greaterThan(0)); + public void testTaskCancelledAfterWriteBufferLimitHasBeenReached() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, 32, Long.MAX_VALUE); + startTask(task, 64, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + task.markAsCompleted(); + shardChangesRequests.clear(); + // Also invokes the coordinatesReads() method: + task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L)); + assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled + assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled + + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(0)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(128L)); + assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L)); } - ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBathces, - int globalCheckpoint, int bufferWriteLimit, long followGlobalCheckpoint) { - leaderGlobalCheckPoint = new AtomicLong(globalCheckpoint); - imdVersion = new AtomicLong(1L); - mappingUpdateCounter = new AtomicInteger(0); - randomlyTruncateRequests = new AtomicBoolean(false); - truncatedRequests = new AtomicInteger(); - randomlyFailWithRetryableError = new AtomicBoolean(false); - failedRequests = new AtomicInteger(0); + public void testReceiveRetryableError() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 63, -1); + + int max = randomIntBetween(1, 10); + for (int i = 0; i < max; i++) { + readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); + } + task.coordinateReads(); + + // NUmber of requests is equal to initial request + retried attempts + assertThat(shardChangesRequests.size(), equalTo(max + 1)); + for (long[] shardChangesRequest : shardChangesRequests) { + assertThat(shardChangesRequest[0], equalTo(0L)); + assertThat(shardChangesRequest[1], equalTo(64L)); + } + + assertThat(task.isStopped(), equalTo(false)); + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L)); + } + + public void testReceiveRetryableErrorRetriedTooManyTimes() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 63, -1); + + int max = randomIntBetween(11, 32); + for (int i = 0; i < max; i++) { + readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); + } + task.coordinateReads(); + + assertThat(shardChangesRequests.size(), equalTo(11)); + for (long[] shardChangesRequest : shardChangesRequests) { + assertThat(shardChangesRequest[0], equalTo(0L)); + assertThat(shardChangesRequest[1], equalTo(64L)); + } + + assertThat(task.isStopped(), equalTo(true)); + assertThat(fatalError, notNullValue()); + assertThat(fatalError.getMessage(), containsString("retrying failed [")); + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L)); + } + + public void testReceiveNonRetryableError() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 63, -1); + + Exception failure = new RuntimeException(); + readFailures.add(failure); + task.coordinateReads(); + + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + assertThat(task.isStopped(), equalTo(true)); + assertThat(fatalError, sameInstance(failure)); + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L)); + } + + public void testHandleReadResponse() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 64, -1); + + task.coordinateReads(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L); + task.innerHandleReadResponse(0L, 64L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(1)); + assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); + + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getIndexMetadataVersion(), equalTo(0L)); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(1)); + assertThat(status.getLastRequestedSeqno(), equalTo(64L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L)); + assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L)); + } + + public void testReceiveLessThanRequested() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 63, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + shardChangesRequests.clear(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 32, 0L, 31L); + task.innerHandleReadResponse(0L, 64L, response); + + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(32L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(1)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L)); + } + + public void testCancelAndReceiveLessThanRequested() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 63, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + shardChangesRequests.clear(); + task.markAsCompleted(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 32, 0L, 31L); + task.innerHandleReadResponse(0L, 64L, response); + + assertThat(shardChangesRequests.size(), equalTo(0)); + assertThat(bulkShardOperationRequests.size(), equalTo(0)); + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(0)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L)); + } + + public void testReceiveNothingExpectedSomething() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 63, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + shardChangesRequests.clear(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 0, 0L, 0L); + task.innerHandleReadResponse(0L, 64L, response); + + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L)); + } + + public void testDelayCoordinatesRead() { + int[] counter = new int[]{0}; + scheduler = (delay, task) -> { + counter[0]++; + task.run(); + }; + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 64, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + shardChangesRequests.clear(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 65, 0L, 64L); + // Also invokes coordinateReads() + task.innerHandleReadResponse(0L, 64L, response); + response = generateShardChangesResponse(0, 0, 0L, 64L); + task.innerHandleReadResponse(65L, 64L, response); + assertThat(counter[0], equalTo(1)); + } + + public void testMappingUpdate() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 64, -1); + + imdVersions.add(1L); + task.coordinateReads(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L); + task.handleReadResponse(0L, 64L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(1)); + assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); + + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getIndexMetadataVersion(), equalTo(1L)); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(1)); + assertThat(status.getLastRequestedSeqno(), equalTo(64L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L)); + assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L)); + } + + public void testMappingUpdateRetryableError() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 64, -1); + + int max = randomIntBetween(1, 10); + for (int i = 0; i < max; i++) { + mappingUpdateFailures.add(new ConnectException()); + } + imdVersions.add(1L); + task.coordinateReads(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L); + task.handleReadResponse(0L, 64L, response); + + assertThat(mappingUpdateFailures.size(), equalTo(0)); + assertThat(bulkShardOperationRequests.size(), equalTo(1)); + assertThat(task.isStopped(), equalTo(false)); + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getIndexMetadataVersion(), equalTo(1L)); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(1)); + assertThat(status.getLastRequestedSeqno(), equalTo(64L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L)); + + } + + public void testMappingUpdateRetryableErrorRetriedTooManyTimes() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 63, -1); + + int max = randomIntBetween(11, 20); + for (int i = 0; i < max; i++) { + mappingUpdateFailures.add(new ConnectException()); + } + imdVersions.add(1L); + task.coordinateReads(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L); + task.handleReadResponse(0L, 64L, response); + + assertThat(mappingUpdateFailures.size(), equalTo(max - 11)); + assertThat(imdVersions.size(), equalTo(1)); + assertThat(bulkShardOperationRequests.size(), equalTo(0)); + assertThat(task.isStopped(), equalTo(true)); + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getIndexMetadataVersion(), equalTo(0L)); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L)); + } + + public void testMappingUpdateNonRetryableError() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 63, -1); + + mappingUpdateFailures.add(new RuntimeException()); + task.coordinateReads(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L); + task.handleReadResponse(0L, 64L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(0)); + assertThat(task.isStopped(), equalTo(true)); + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getIndexMetadataVersion(), equalTo(0L)); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L)); + } + + public void testCoordinateWrites() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 64, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L); + // Also invokes coordinatesWrites() + task.innerHandleReadResponse(0L, 64L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(1)); + assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); + + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(1)); + assertThat(status.getLastRequestedSeqno(), equalTo(64L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L)); + assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L)); + } + + public void testMaxConcurrentWrites() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 2, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); + // Also invokes coordinatesWrites() + task.innerHandleReadResponse(0L, 64L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(2)); + assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()).subList(0, 64))); + assertThat(bulkShardOperationRequests.get(1), equalTo(Arrays.asList(response.getOperations()).subList(64, 128))); + + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(2)); + + task = createShardFollowTask(64, 1, 4, Integer.MAX_VALUE, Long.MAX_VALUE); + response = generateShardChangesResponse(0, 256, 0L, 256L); + // Also invokes coordinatesWrites() + task.innerHandleReadResponse(0L, 64L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(4)); + assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()).subList(0, 64))); + assertThat(bulkShardOperationRequests.get(1), equalTo(Arrays.asList(response.getOperations()).subList(64, 128))); + assertThat(bulkShardOperationRequests.get(2), equalTo(Arrays.asList(response.getOperations()).subList(128, 192))); + assertThat(bulkShardOperationRequests.get(3), equalTo(Arrays.asList(response.getOperations()).subList(192, 256))); + + status = task.getStatus(); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(4)); + } + + public void testMaxBatchOperationCount() { + ShardFollowNodeTask task = createShardFollowTask(8, 1, 32, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); + // Also invokes coordinatesWrites() + task.innerHandleReadResponse(0L, 64L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(32)); + for (int i = 0; i < 32; i += 8) { + int offset = i * 8; + assertThat(bulkShardOperationRequests.get(i), equalTo(Arrays.asList(response.getOperations()).subList(offset, offset + 8))); + } + + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(32)); + } + + public void testRetryableError() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 64, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + int max = randomIntBetween(1, 10); + for (int i = 0; i < max; i++) { + writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); + } + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L); + // Also invokes coordinatesWrites() + task.innerHandleReadResponse(0L, 64L, response); + + // Number of requests is equal to initial request + retried attempts: + assertThat(bulkShardOperationRequests.size(), equalTo(max + 1)); + for (List operations : bulkShardOperationRequests) { + assertThat(operations, equalTo(Arrays.asList(response.getOperations()))); + } + assertThat(task.isStopped(), equalTo(false)); + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(1)); + assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L)); + } + + public void testRetryableErrorRetriedTooManyTimes() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 64, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + int max = randomIntBetween(11, 32); + for (int i = 0; i < max; i++) { + writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); + } + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L); + // Also invokes coordinatesWrites() + task.innerHandleReadResponse(0L, 64L, response); + + // Number of requests is equal to initial request + retried attempts: + assertThat(bulkShardOperationRequests.size(), equalTo(11)); + for (List operations : bulkShardOperationRequests) { + assertThat(operations, equalTo(Arrays.asList(response.getOperations()))); + } + assertThat(task.isStopped(), equalTo(true)); + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(1)); + assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L)); + } + + public void testNonRetryableError() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 64, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + writeFailures.add(new RuntimeException()); + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L); + // Also invokes coordinatesWrites() + task.innerHandleReadResponse(0L, 64L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(1)); + assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); + assertThat(task.isStopped(), equalTo(true)); + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(1)); + assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L)); + } + + public void testMaxBatchBytesLimit() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 128, Integer.MAX_VALUE, 1L); + startTask(task, 64, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L); + // Also invokes coordinatesWrites() + task.innerHandleReadResponse(0L, 64L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(64)); + } + + public void testHandleWriteResponse() { + ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 63, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + shardChangesRequests.clear(); + followerGlobalCheckpoints.add(63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 63L); + // Also invokes coordinatesWrites() + task.innerHandleReadResponse(0L, 63L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(1)); + assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); + + // handleWrite() also delegates to coordinateReads + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(64L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L)); + assertThat(status.getFollowerGlobalCheckpoint(), equalTo(63L)); + } + + ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBatches, + int bufferWriteLimit, long maxBatchSizeInBytes) { AtomicBoolean stopped = new AtomicBoolean(false); ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), - new ShardId("leader_index", "", 0), maxBatchOperationCount, maxConcurrentReadBatches, - ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, maxConcurrentWriteBathces, bufferWriteLimit, - TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10), Collections.emptyMap()); + new ShardId("leader_index", "", 0), maxBatchOperationCount, maxConcurrentReadBatches, maxBatchSizeInBytes, + maxConcurrentWriteBatches, bufferWriteLimit, TimeValue.ZERO, TimeValue.ZERO, Collections.emptyMap()); - BiConsumer scheduler = (delay, task) -> { - try { - Thread.sleep(delay.millis()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - Thread thread = new Thread(task); - thread.start(); - }; - AtomicInteger readCounter = new AtomicInteger(); - AtomicInteger writeCounter = new AtomicInteger(); - LocalCheckpointTracker tracker = new LocalCheckpointTracker(followGlobalCheckpoint, followGlobalCheckpoint); + shardChangesRequests = new ArrayList<>(); + bulkShardOperationRequests = new ArrayList<>(); + readFailures = new LinkedList<>(); + writeFailures = new LinkedList<>(); + mappingUpdateFailures = new LinkedList<>(); + imdVersions = new LinkedList<>(); + followerGlobalCheckpoints = new LinkedList<>(); return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) { @Override protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { - mappingUpdateCounter.incrementAndGet(); - handler.accept(imdVersion.get()); + Exception failure = mappingUpdateFailures.poll(); + if (failure != null) { + errorHandler.accept(failure); + return; + } + + Long imdVersion = imdVersions.poll(); + if (imdVersion != null) { + handler.accept(imdVersion); + } } @Override protected void innerSendBulkShardOperationsRequest(List operations, LongConsumer handler, Consumer errorHandler) { - if (randomlyFailWithRetryableError.get() && readCounter.incrementAndGet() % 5 == 0) { - failedRequests.incrementAndGet(); - errorHandler.accept(new UnavailableShardsException(params.getFollowShardId(), "test error")); + bulkShardOperationRequests.add(operations); + Exception writeFailure = ShardFollowNodeTaskTests.this.writeFailures.poll(); + if (writeFailure != null) { + errorHandler.accept(writeFailure); return; } - - for(Translog.Operation op : operations) { - tracker.markSeqNoAsCompleted(op.seqNo()); + Long followerGlobalCheckpoint = followerGlobalCheckpoints.poll(); + if (followerGlobalCheckpoint != null) { + handler.accept(followerGlobalCheckpoint); } - - // Emulate network thread and avoid SO: - Thread thread = new Thread(() -> handler.accept(tracker.getCheckpoint())); - thread.start(); } @Override protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer handler, Consumer errorHandler) { - if (randomlyFailWithRetryableError.get() && writeCounter.incrementAndGet() % 5 == 0) { - failedRequests.incrementAndGet(); - errorHandler.accept(new UnavailableShardsException(params.getFollowShardId(), "test error")); - return; + shardChangesRequests.add(new long[]{from, maxBatchOperationCount}); + Exception readFailure = ShardFollowNodeTaskTests.this.readFailures.poll(); + if (readFailure != null) { + errorHandler.accept(readFailure); } - - if (from < 0) { - errorHandler.accept(new IllegalArgumentException()); - return; - } - - ShardChangesAction.Response response; - if (from > leaderGlobalCheckPoint.get()) { - response = new ShardChangesAction.Response(imdVersion.get(), leaderGlobalCheckPoint.get(), new Translog.Operation[0]); - } else { - if (randomlyTruncateRequests.get() && maxOperationCount > 10 && truncatedRequests.get() < 5) { - truncatedRequests.incrementAndGet(); - maxOperationCount = maxOperationCount / 2; - } - List ops = new ArrayList<>(); - long maxSeqNo = Math.min(from + maxOperationCount, leaderGlobalCheckPoint.get()); - for (long seqNo = from; seqNo <= maxSeqNo; seqNo++) { - String id = UUIDs.randomBase64UUID(); - byte[] source = "{}".getBytes(StandardCharsets.UTF_8); - ops.add(new Translog.Index("doc", id, seqNo, 0, source)); - } - response = new ShardChangesAction.Response(imdVersion.get(), leaderGlobalCheckPoint.get(), - ops.toArray(new Translog.Operation[0])); - } - // Emulate network thread and avoid SO: - Thread thread = new Thread(() -> handler.accept(response)); - thread.start(); } @Override @@ -218,23 +696,27 @@ public class ShardFollowNodeTaskTests extends ESTestCase { @Override public void markAsFailed(Exception e) { + fatalError = e; stopped.set(true); - failureHolder.set(e); } }; } - @After - public void cancelNodeTask() throws Exception { - if (task != null){ - task.markAsCompleted(); - assertThat(failureHolder.get(), nullValue()); - assertBusy(() -> { - ShardFollowNodeTask.Status status = task.getStatus(); - assertThat(status.getNumberOfConcurrentReads(), equalTo(0)); - assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); - }); + private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, int size, long imdVersion, + long leaderGlobalCheckPoint) { + List ops = new ArrayList<>(); + for (long seqNo = fromSeqNo; seqNo < size; seqNo++) { + String id = UUIDs.randomBase64UUID(); + byte[] source = "{}".getBytes(StandardCharsets.UTF_8); + ops.add(new Translog.Index("doc", id, seqNo, 0, source)); } + return new ShardChangesAction.Response(imdVersion, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0])); } + void startTask(ShardFollowNodeTask task, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) { + // The call the updateMapping is a noop, so noting happens. + task.start(leaderGlobalCheckpoint, followerGlobalCheckpoint); + } + + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index da957e7ee5e..bad85211f41 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -45,7 +45,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest leaderGroup.assertAllEqual(docCount); followerGroup.startAll(); ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); - shardFollowTask.start(followerGroup.getPrimary().getGlobalCheckpoint()); + shardFollowTask.start(leaderGroup.getPrimary().getGlobalCheckpoint(), followerGroup.getPrimary().getGlobalCheckpoint()); docCount += leaderGroup.appendDocs(randomInt(128)); leaderGroup.syncGlobalCheckpoint(); @@ -62,7 +62,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest leaderGroup.startAll(); followerGroup.startAll(); ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); - shardFollowTask.start(followerGroup.getPrimary().getGlobalCheckpoint()); + shardFollowTask.start(leaderGroup.getPrimary().getGlobalCheckpoint(), followerGroup.getPrimary().getGlobalCheckpoint()); int docCount = 256; leaderGroup.appendDocs(1); Runnable task = () -> {