[CCR] Clear fetch exceptions if an empty but successful shard changes response returns (#34256)

Also fixed ShardFollowNodeTaskTests to not return ops when responseSize
is empty. Otherwise ops are returned when no ops are expected to be returned.

Co-authored-by: Jason Tedor <jason@tedor.me>
This commit is contained in:
Martijn van Groningen 2018-10-06 07:53:37 -04:00 committed by GitHub
parent 899e48395b
commit c6c83d19f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 6 deletions

View File

@ -238,15 +238,16 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
}
innerSendShardChangesRequest(from, maxOperationCount,
response -> {
if (response.getOperations().length > 0) {
// do not count polls against fetch stats
synchronized (ShardFollowNodeTask.this) {
synchronized (ShardFollowNodeTask.this) {
// Always clear fetch exceptions:
fetchExceptions.remove(from);
if (response.getOperations().length > 0) {
// do not count polls against fetch stats
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfSuccessfulFetches++;
fetchExceptions.remove(from);
operationsReceived += response.getOperations().length;
totalTransferredBytes +=
Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
}
}
handleReadResponse(from, maxRequiredSeqNo, response);

View File

@ -190,6 +190,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
mappingVersions.add(1L);
leaderGlobalCheckpoints.add(63L);
maxSeqNos.add(63L);
responseSizes.add(64);
simulateResponse.set(true);
final AtomicLong retryCounter = new AtomicLong();
// before each retry, we assert the fetch failures; after the last retry, the fetch failure should clear
@ -228,6 +229,35 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
public void testEmptyShardChangesResponseShouldClearFetchException() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, -1, -1);
readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
mappingVersions.add(1L);
leaderGlobalCheckpoints.add(-1L);
maxSeqNos.add(-1L);
simulateResponse.set(true);
task.coordinateReads();
// number of requests is equal to initial request + retried attempts
assertThat(shardChangesRequests.size(), equalTo(2));
for (long[] shardChangesRequest : shardChangesRequests) {
assertThat(shardChangesRequest[0], equalTo(0L));
assertThat(shardChangesRequest[1], equalTo(64L));
}
assertFalse("task is not stopped", task.isStopped());
ShardFollowNodeTaskStatus status = task.getStatus();
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.numberOfFailedFetches(), equalTo(1L));
// the fetch failure should have been cleared:
assertThat(status.fetchExceptions().entrySet(), hasSize(0));
assertThat(status.lastRequestedSeqNo(), equalTo(-1L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(-1L));
}
public void testReceiveTimeout() {
final ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
@ -262,6 +292,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
mappingVersions.add(1L);
leaderGlobalCheckpoints.add(63L);
maxSeqNos.add(63L);
responseSizes.add(64);
simulateResponse.set(true);
task.coordinateReads();
@ -742,7 +773,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
if (readFailure != null) {
errorHandler.accept(readFailure);
} else if (simulateResponse.get()) {
final int responseSize = responseSizes.size() == 0 ? requestBatchSize : responseSizes.poll();
final int responseSize = responseSizes.size() == 0 ? 0 : responseSizes.poll();
final Translog.Operation[] operations = new Translog.Operation[responseSize];
for (int i = 0; i < responseSize; i++) {
operations[i] = new Translog.NoOp(from + i, 0, "test");