diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 492a7e492ff..0be7c392de2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -188,9 +188,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint()); - final long newMinRequiredSeqNo; + final long newFromSeqNo; if (response.getOperations().length == 0) { - newMinRequiredSeqNo = from; + newFromSeqNo = from; } else { assert response.getOperations()[0].seqNo() == from : "first operation is not what we asked for. From is [" + from + "], got " + response.getOperations()[0]; @@ -198,19 +198,18 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo(); assert maxSeqNo == Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong(); - newMinRequiredSeqNo = maxSeqNo + 1; + newFromSeqNo = maxSeqNo + 1; // update last requested seq no as we may have gotten more than we asked for and we don't want to ask it again. lastRequestedSeqno = Math.max(lastRequestedSeqno, maxSeqNo); assert lastRequestedSeqno <= leaderGlobalCheckpoint : "lastRequestedSeqno [" + lastRequestedSeqno + "] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]"; coordinateWrites(); } - - if (newMinRequiredSeqNo < maxRequiredSeqNo && isStopped() == false) { - int newSize = (int) (maxRequiredSeqNo - newMinRequiredSeqNo) + 1; + if (newFromSeqNo <= maxRequiredSeqNo && isStopped() == false) { + int newSize = Math.toIntExact(maxRequiredSeqNo - newFromSeqNo + 1); LOGGER.trace("{} received [{}] ops, still missing [{}/{}], continuing to read...", - params.getFollowShardId(), response.getOperations().length, newMinRequiredSeqNo, maxRequiredSeqNo); - sendShardChangesRequest(newMinRequiredSeqNo, newSize, maxRequiredSeqNo); + params.getFollowShardId(), response.getOperations().length, newFromSeqNo, maxRequiredSeqNo); + sendShardChangesRequest(newFromSeqNo, newSize, maxRequiredSeqNo); } else { // read is completed, decrement numConcurrentReads--; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 35b00a61571..bbe4ac6806e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -62,7 +62,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { public void testWriteBuffer() { // Need to set concurrentWrites to 0, other the write buffer gets flushed immediately: ShardFollowNodeTask task = createShardFollowTask(64, 1, 0, 32, Long.MAX_VALUE); - startTask(task, 64, -1); + startTask(task, 63, -1); task.coordinateReads(); assertThat(shardChangesRequests.size(), equalTo(1)); @@ -234,11 +234,11 @@ public class ShardFollowNodeTaskTests extends ESTestCase { public void testHandleReadResponse() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 64, -1); + startTask(task, 63, -1); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L); - task.innerHandleReadResponse(0L, 64L, response); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); + task.innerHandleReadResponse(0L, 63L, response); assertThat(bulkShardOperationRequests.size(), equalTo(1)); assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); @@ -248,8 +248,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); assertThat(status.getNumberOfConcurrentWrites(), equalTo(1)); - assertThat(status.getLastRequestedSeqno(), equalTo(64L)); - assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L)); assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L)); } @@ -263,7 +263,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 32, 0L, 31L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L); task.innerHandleReadResponse(0L, 64L, response); assertThat(shardChangesRequests.size(), equalTo(1)); @@ -288,7 +288,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { shardChangesRequests.clear(); task.markAsCompleted(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 32, 0L, 31L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L); task.innerHandleReadResponse(0L, 64L, response); assertThat(shardChangesRequests.size(), equalTo(0)); @@ -310,8 +310,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 0, 0L, 0L); - task.innerHandleReadResponse(0L, 64L, response); + task.innerHandleReadResponse(0L, 64L, + new ShardChangesAction.Response(0, 0, new Translog.Operation[0])); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); @@ -331,7 +331,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { task.run(); }; ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 64, -1); + startTask(task, 63, -1); task.coordinateReads(); assertThat(shardChangesRequests.size(), equalTo(1)); @@ -339,22 +339,22 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 65, 0L, 64L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); // Also invokes coordinateReads() - task.innerHandleReadResponse(0L, 64L, response); - response = generateShardChangesResponse(0, 0, 0L, 64L); - task.innerHandleReadResponse(65L, 64L, response); + task.innerHandleReadResponse(0L, 63L, response); + task.innerHandleReadResponse(64L, 63L, + new ShardChangesAction.Response(0, 63L, new Translog.Operation[0])); assertThat(counter[0], equalTo(1)); } public void testMappingUpdate() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 64, -1); + startTask(task, 63, -1); imdVersions.add(1L); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L); - task.handleReadResponse(0L, 64L, response); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L); + task.handleReadResponse(0L, 63L, response); assertThat(bulkShardOperationRequests.size(), equalTo(1)); assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); @@ -363,14 +363,14 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(status.getIndexMetadataVersion(), equalTo(1L)); assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); assertThat(status.getNumberOfConcurrentWrites(), equalTo(1)); - assertThat(status.getLastRequestedSeqno(), equalTo(64L)); - assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L)); assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L)); } public void testMappingUpdateRetryableError() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 64, -1); + startTask(task, 63, -1); int max = randomIntBetween(1, 10); for (int i = 0; i < max; i++) { @@ -378,8 +378,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } imdVersions.add(1L); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L); - task.handleReadResponse(0L, 64L, response); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L); + task.handleReadResponse(0L, 63L, response); assertThat(mappingUpdateFailures.size(), equalTo(0)); assertThat(bulkShardOperationRequests.size(), equalTo(1)); @@ -388,8 +388,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(status.getIndexMetadataVersion(), equalTo(1L)); assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); assertThat(status.getNumberOfConcurrentWrites(), equalTo(1)); - assertThat(status.getLastRequestedSeqno(), equalTo(64L)); - assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L)); } @@ -439,16 +439,16 @@ public class ShardFollowNodeTaskTests extends ESTestCase { public void testCoordinateWrites() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 64, -1); + startTask(task, 63, -1); task.coordinateReads(); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); // Also invokes coordinatesWrites() - task.innerHandleReadResponse(0L, 64L, response); + task.innerHandleReadResponse(0L, 63L, response); assertThat(bulkShardOperationRequests.size(), equalTo(1)); assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); @@ -456,8 +456,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); assertThat(status.getNumberOfConcurrentWrites(), equalTo(1)); - assertThat(status.getLastRequestedSeqno(), equalTo(64L)); - assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L)); + assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L)); assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L)); } @@ -507,7 +507,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { public void testRetryableError() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 64, -1); + startTask(task, 63, -1); task.coordinateReads(); assertThat(shardChangesRequests.size(), equalTo(1)); @@ -518,9 +518,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase { for (int i = 0; i < max; i++) { writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); // Also invokes coordinatesWrites() - task.innerHandleReadResponse(0L, 64L, response); + task.innerHandleReadResponse(0L, 63L, response); // Number of requests is equal to initial request + retried attempts: assertThat(bulkShardOperationRequests.size(), equalTo(max + 1)); @@ -535,7 +535,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { public void testRetryableErrorRetriedTooManyTimes() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 64, -1); + startTask(task, 63, -1); task.coordinateReads(); assertThat(shardChangesRequests.size(), equalTo(1)); @@ -546,9 +546,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase { for (int i = 0; i < max; i++) { writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 643); // Also invokes coordinatesWrites() - task.innerHandleReadResponse(0L, 64L, response); + task.innerHandleReadResponse(0L, 63L, response); // Number of requests is equal to initial request + retried attempts: assertThat(bulkShardOperationRequests.size(), equalTo(11)); @@ -563,7 +563,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { public void testNonRetryableError() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 64, -1); + startTask(task, 63, -1); task.coordinateReads(); assertThat(shardChangesRequests.size(), equalTo(1)); @@ -571,9 +571,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); writeFailures.add(new RuntimeException()); - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); // Also invokes coordinatesWrites() - task.innerHandleReadResponse(0L, 64L, response); + task.innerHandleReadResponse(0L, 63L, response); assertThat(bulkShardOperationRequests.size(), equalTo(1)); assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); @@ -592,7 +592,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 64L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -610,7 +610,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { shardChangesRequests.clear(); followerGlobalCheckpoints.add(63L); - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 63L, response); @@ -702,10 +702,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase { }; } - private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, int size, long imdVersion, + private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, long toSeqNo, long imdVersion, long leaderGlobalCheckPoint) { List ops = new ArrayList<>(); - for (long seqNo = fromSeqNo; seqNo < size; seqNo++) { + for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) { String id = UUIDs.randomBase64UUID(); byte[] source = "{}".getBytes(StandardCharsets.UTF_8); ops.add(new Translog.Index("doc", id, seqNo, 0, source)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index bad85211f41..8b7f01f3885 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -35,6 +35,8 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; +import static org.hamcrest.Matchers.equalTo; + public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase { public void testSimpleCcrReplication() throws Exception { @@ -51,7 +53,10 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest leaderGroup.assertAllEqual(docCount); int expectedCount = docCount; - assertBusy(() -> followerGroup.assertAllEqual(expectedCount)); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); + followerGroup.assertAllEqual(expectedCount); + }); shardFollowTask.markAsCompleted(); } }