CCR: Fix incorrect read request completion condition (#32266)

Today we consider a read request is exhausted if from_seqno is equal to
or greater than the max_required_seqno. However, if we stop when
from_seqno equals to the max_required_seqno, we will miss an operation
whose seqno is max_required_seqno because we have not seen that 
operation yet.
This commit is contained in:
Nhat Nguyen 2018-07-22 22:14:27 -04:00 committed by GitHub
parent 8e66a937c9
commit 88190299df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 52 deletions

View File

@ -188,9 +188,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint());
final long newMinRequiredSeqNo;
final long newFromSeqNo;
if (response.getOperations().length == 0) {
newMinRequiredSeqNo = from;
newFromSeqNo = from;
} else {
assert response.getOperations()[0].seqNo() == from :
"first operation is not what we asked for. From is [" + from + "], got " + response.getOperations()[0];
@ -198,19 +198,18 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo();
assert maxSeqNo ==
Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong();
newMinRequiredSeqNo = maxSeqNo + 1;
newFromSeqNo = maxSeqNo + 1;
// update last requested seq no as we may have gotten more than we asked for and we don't want to ask it again.
lastRequestedSeqno = Math.max(lastRequestedSeqno, maxSeqNo);
assert lastRequestedSeqno <= leaderGlobalCheckpoint : "lastRequestedSeqno [" + lastRequestedSeqno +
"] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]";
coordinateWrites();
}
if (newMinRequiredSeqNo < maxRequiredSeqNo && isStopped() == false) {
int newSize = (int) (maxRequiredSeqNo - newMinRequiredSeqNo) + 1;
if (newFromSeqNo <= maxRequiredSeqNo && isStopped() == false) {
int newSize = Math.toIntExact(maxRequiredSeqNo - newFromSeqNo + 1);
LOGGER.trace("{} received [{}] ops, still missing [{}/{}], continuing to read...",
params.getFollowShardId(), response.getOperations().length, newMinRequiredSeqNo, maxRequiredSeqNo);
sendShardChangesRequest(newMinRequiredSeqNo, newSize, maxRequiredSeqNo);
params.getFollowShardId(), response.getOperations().length, newFromSeqNo, maxRequiredSeqNo);
sendShardChangesRequest(newFromSeqNo, newSize, maxRequiredSeqNo);
} else {
// read is completed, decrement
numConcurrentReads--;

View File

@ -62,7 +62,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
public void testWriteBuffer() {
// Need to set concurrentWrites to 0, other the write buffer gets flushed immediately:
ShardFollowNodeTask task = createShardFollowTask(64, 1, 0, 32, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
@ -234,11 +234,11 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
public void testHandleReadResponse() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);
task.coordinateReads();
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
task.innerHandleReadResponse(0L, 64L, response);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
task.innerHandleReadResponse(0L, 63L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
@ -248,8 +248,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}
@ -263,7 +263,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
shardChangesRequests.clear();
ShardChangesAction.Response response = generateShardChangesResponse(0, 32, 0L, 31L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L);
task.innerHandleReadResponse(0L, 64L, response);
assertThat(shardChangesRequests.size(), equalTo(1));
@ -288,7 +288,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
shardChangesRequests.clear();
task.markAsCompleted();
ShardChangesAction.Response response = generateShardChangesResponse(0, 32, 0L, 31L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L);
task.innerHandleReadResponse(0L, 64L, response);
assertThat(shardChangesRequests.size(), equalTo(0));
@ -310,8 +310,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
shardChangesRequests.clear();
ShardChangesAction.Response response = generateShardChangesResponse(0, 0, 0L, 0L);
task.innerHandleReadResponse(0L, 64L, response);
task.innerHandleReadResponse(0L, 64L,
new ShardChangesAction.Response(0, 0, new Translog.Operation[0]));
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
@ -331,7 +331,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
task.run();
};
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
@ -339,22 +339,22 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
shardChangesRequests.clear();
ShardChangesAction.Response response = generateShardChangesResponse(0, 65, 0L, 64L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
// Also invokes coordinateReads()
task.innerHandleReadResponse(0L, 64L, response);
response = generateShardChangesResponse(0, 0, 0L, 64L);
task.innerHandleReadResponse(65L, 64L, response);
task.innerHandleReadResponse(0L, 63L, response);
task.innerHandleReadResponse(64L, 63L,
new ShardChangesAction.Response(0, 63L, new Translog.Operation[0]));
assertThat(counter[0], equalTo(1));
}
public void testMappingUpdate() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);
imdVersions.add(1L);
task.coordinateReads();
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L);
task.handleReadResponse(0L, 64L, response);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L);
task.handleReadResponse(0L, 63L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
@ -363,14 +363,14 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(status.getIndexMetadataVersion(), equalTo(1L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}
public void testMappingUpdateRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);
int max = randomIntBetween(1, 10);
for (int i = 0; i < max; i++) {
@ -378,8 +378,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
imdVersions.add(1L);
task.coordinateReads();
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L);
task.handleReadResponse(0L, 64L, response);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L);
task.handleReadResponse(0L, 63L, response);
assertThat(mappingUpdateFailures.size(), equalTo(0));
assertThat(bulkShardOperationRequests.size(), equalTo(1));
@ -388,8 +388,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(status.getIndexMetadataVersion(), equalTo(1L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
}
@ -439,16 +439,16 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
public void testCoordinateWrites() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
task.innerHandleReadResponse(0L, 63L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
@ -456,8 +456,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}
@ -507,7 +507,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
public void testRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
@ -518,9 +518,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
for (int i = 0; i < max; i++) {
writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
}
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
task.innerHandleReadResponse(0L, 63L, response);
// Number of requests is equal to initial request + retried attempts:
assertThat(bulkShardOperationRequests.size(), equalTo(max + 1));
@ -535,7 +535,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
public void testRetryableErrorRetriedTooManyTimes() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
@ -546,9 +546,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
for (int i = 0; i < max; i++) {
writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
}
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 643);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
task.innerHandleReadResponse(0L, 63L, response);
// Number of requests is equal to initial request + retried attempts:
assertThat(bulkShardOperationRequests.size(), equalTo(11));
@ -563,7 +563,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
public void testNonRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
@ -571,9 +571,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
writeFailures.add(new RuntimeException());
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
task.innerHandleReadResponse(0L, 63L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
@ -592,7 +592,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 64L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
@ -610,7 +610,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
shardChangesRequests.clear();
followerGlobalCheckpoints.add(63L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 63L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 63L, response);
@ -702,10 +702,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
};
}
private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, int size, long imdVersion,
private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, long toSeqNo, long imdVersion,
long leaderGlobalCheckPoint) {
List<Translog.Operation> ops = new ArrayList<>();
for (long seqNo = fromSeqNo; seqNo < size; seqNo++) {
for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) {
String id = UUIDs.randomBase64UUID();
byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
ops.add(new Translog.Index("doc", id, seqNo, 0, source));

View File

@ -35,6 +35,8 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import static org.hamcrest.Matchers.equalTo;
public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase {
public void testSimpleCcrReplication() throws Exception {
@ -51,7 +53,10 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
leaderGroup.assertAllEqual(docCount);
int expectedCount = docCount;
assertBusy(() -> followerGroup.assertAllEqual(expectedCount));
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(expectedCount);
});
shardFollowTask.markAsCompleted();
}
}