Ensure CCR partial reads never overuse buffer (#58620)

When the documents are large, a follower can receive a partial response 
because the requesting range of operations is capped by
max_read_request_size instead of max_read_request_operation_count. In
this case, the follower will continue reading the subsequent ranges
without checking the remaining size of the buffer. The buffer then can
use more memory than max_write_buffer_size and even causes OOM.

Backport of #58620
This commit is contained in:
Nhat Nguyen 2020-07-01 13:23:28 -04:00 committed by GitHub
parent 98a62a6b2d
commit f63cbad629
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 83 additions and 11 deletions

View File

@ -91,6 +91,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private long failedWriteRequests = 0;
private long operationWritten = 0;
private long lastFetchTime = -1;
private final Queue<Tuple<Long, Long>> partialReadRequests = new PriorityQueue<>(Comparator.comparing(Tuple::v1));
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
private long bufferSizeInBytes = 0;
private final LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>> fetchExceptions;
@ -188,6 +189,20 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
LOGGER.trace("{} coordinate reads, lastRequestedSeqNo={}, leaderGlobalCheckpoint={}",
params.getFollowShardId(), lastRequestedSeqNo, leaderGlobalCheckpoint);
assert partialReadRequests.size() <= params.getMaxOutstandingReadRequests() :
"too many partial read requests [" + partialReadRequests + "]";
while (hasReadBudget() && partialReadRequests.isEmpty() == false) {
final Tuple<Long, Long> range = partialReadRequests.remove();
assert range.v1() <= range.v2() && range.v2() <= lastRequestedSeqNo :
"invalid partial range [" + range.v1() + "," + range.v2() + "]; last requested seq_no [" + lastRequestedSeqNo + "]";
final long fromSeqNo = range.v1();
final long maxRequiredSeqNo = range.v2();
final int requestOpCount = Math.toIntExact(maxRequiredSeqNo - fromSeqNo + 1);
LOGGER.trace("{}[{} ongoing reads] continue partial read request from_seqno={} max_required_seqno={} batch_count={}",
params.getFollowShardId(), numOutstandingReads, fromSeqNo, maxRequiredSeqNo, requestOpCount);
numOutstandingReads++;
sendShardChangesRequest(fromSeqNo, requestOpCount, maxRequiredSeqNo);
}
final int maxReadRequestOperationCount = params.getMaxReadRequestOperationCount();
while (hasReadBudget() && lastRequestedSeqNo < leaderGlobalCheckpoint) {
final long from = lastRequestedSeqNo + 1;
@ -203,8 +218,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
LOGGER.trace("{}[{} ongoing reads] read from_seqno={} max_required_seqno={} batch_count={}",
params.getFollowShardId(), numOutstandingReads, from, maxRequiredSeqNo, requestOpCount);
numOutstandingReads++;
sendShardChangesRequest(from, requestOpCount, maxRequiredSeqNo);
lastRequestedSeqNo = maxRequiredSeqNo;
sendShardChangesRequest(from, requestOpCount, maxRequiredSeqNo);
}
if (numOutstandingReads == 0 && hasReadBudget()) {
@ -220,6 +235,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private boolean hasReadBudget() {
assert Thread.holdsLock(this);
// TODO: To ensure that we never overuse the buffer, we need to
// - Overestimate the size and count of the responses of the outstanding request when calculating the budget
// - Limit the size and count of next read requests by the remaining size and count of the buffer
if (numOutstandingReads >= params.getMaxOutstandingReadRequests()) {
LOGGER.trace("{} no new reads, maximum number of concurrent reads have been reached [{}]",
params.getFollowShardId(), numOutstandingReads);
@ -229,7 +247,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
LOGGER.trace("{} no new reads, buffer size limit has been reached [{}]", params.getFollowShardId(), bufferSizeInBytes);
return false;
}
if (buffer.size() > params.getMaxWriteBufferCount()) {
if (buffer.size() >= params.getMaxWriteBufferCount()) {
LOGGER.trace("{} no new reads, buffer count limit has been reached [{}]", params.getFollowShardId(), buffer.size());
return false;
}
@ -374,16 +392,13 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
"] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]";
coordinateWrites();
}
if (newFromSeqNo <= maxRequiredSeqNo && isStopped() == false) {
int newSize = Math.toIntExact(maxRequiredSeqNo - newFromSeqNo + 1);
LOGGER.trace("{} received [{}] ops, still missing [{}/{}], continuing to read...",
if (newFromSeqNo <= maxRequiredSeqNo) {
LOGGER.trace("{} received [{}] operations, enqueue partial read request [{}/{}]",
params.getFollowShardId(), response.getOperations().length, newFromSeqNo, maxRequiredSeqNo);
sendShardChangesRequest(newFromSeqNo, newSize, maxRequiredSeqNo);
} else {
// read is completed, decrement
numOutstandingReads--;
coordinateReads();
partialReadRequests.add(Tuple.tuple(newFromSeqNo, maxRequiredSeqNo));
}
numOutstandingReads--;
coordinateReads();
}
private void sendBulkShardOperationsRequest(List<Translog.Operation> operations, long leaderMaxSeqNoOfUpdatesOrDeletes,

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -485,6 +486,8 @@ public abstract class CcrIntegTestCase extends ESTestCase {
request.setFollowerIndex(followerIndex);
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10));
request.getParameters().setMaxReadRequestSize(new ByteSizeValue(between(1, 32 * 1024 * 1024)));
request.getParameters().setMaxReadRequestOperationCount(between(1, 10000));
request.waitForActiveShards(waitForActiveShards);
return request;
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
@ -79,6 +80,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
private Queue<Long> followerGlobalCheckpoints;
private Queue<Long> maxSeqNos;
private Queue<Integer> responseSizes;
private Queue<ActionListener<BulkShardOperationsResponse>> pendingBulkShardRequests;
public void testCoordinateReads() {
ShardFollowTaskParams params = new ShardFollowTaskParams();
@ -599,6 +601,55 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
public void testHandlePartialResponses() {
ShardFollowTaskParams params = new ShardFollowTaskParams();
params.maxReadRequestOperationCount = 10;
params.maxOutstandingReadRequests = 2;
params.maxOutstandingWriteRequests = 1;
params.maxWriteBufferCount = 3;
ShardFollowNodeTask task = createShardFollowTask(params);
startTask(task, 99, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(2));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(10L));
assertThat(shardChangesRequests.get(1)[0], equalTo(10L));
assertThat(shardChangesRequests.get(1)[1], equalTo(10L));
task.innerHandleReadResponse(0L, 9L, generateShardChangesResponse(0L, 5L, 0L, 0L, 1L, 99L));
assertThat(pendingBulkShardRequests, hasSize(1));
assertThat("continue the partial request", shardChangesRequests, hasSize(3));
assertThat(shardChangesRequests.get(2)[0], equalTo(6L));
assertThat(shardChangesRequests.get(2)[1], equalTo(4L));
assertThat(pendingBulkShardRequests, hasSize(1));
task.innerHandleReadResponse(10, 19L, generateShardChangesResponse(10L, 17L, 0L, 0L, 1L, 99L));
assertThat("do not continue partial reads as the buffer is full", shardChangesRequests, hasSize(3));
task.innerHandleReadResponse(6L, 9L, generateShardChangesResponse(6L, 8L, 0L, 0L, 1L, 99L));
assertThat("do not continue partial reads as the buffer is full", shardChangesRequests, hasSize(3));
pendingBulkShardRequests.remove().onResponse(new BulkShardOperationsResponse());
assertThat(pendingBulkShardRequests, hasSize(1));
assertThat("continue two partial requests as the buffer is empty after sending", shardChangesRequests, hasSize(5));
assertThat(shardChangesRequests.get(3)[0], equalTo(9L));
assertThat(shardChangesRequests.get(3)[1], equalTo(1L));
assertThat(shardChangesRequests.get(4)[0], equalTo(18L));
assertThat(shardChangesRequests.get(4)[1], equalTo(2L));
task.innerHandleReadResponse(18L, 19L, generateShardChangesResponse(18L, 19L, 0L, 0L, 1L, 99L));
assertThat("start new range as the buffer has empty slots", shardChangesRequests, hasSize(6));
assertThat(shardChangesRequests.get(5)[0], equalTo(20L));
assertThat(shardChangesRequests.get(5)[1], equalTo(10L));
task.innerHandleReadResponse(9L, 9L, generateShardChangesResponse(9L, 9L, 0L, 0L, 1L, 99L));
assertThat("do not start new range as the buffer is full", shardChangesRequests, hasSize(6));
pendingBulkShardRequests.remove().onResponse(new BulkShardOperationsResponse());
assertThat("start new range as the buffer is empty after sending", shardChangesRequests, hasSize(7));
assertThat(shardChangesRequests.get(6)[0], equalTo(30L));
assertThat(shardChangesRequests.get(6)[1], equalTo(10L));
}
public void testMappingUpdate() {
ShardFollowTaskParams params = new ShardFollowTaskParams();
params.maxReadRequestOperationCount = 64;
@ -996,7 +1047,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 64L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
task.innerHandleReadResponse(0L, 63L, response);
assertThat(bulkShardOperationRequests.size(), equalTo(64));
}
@ -1122,6 +1173,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
followerGlobalCheckpoints = new LinkedList<>();
maxSeqNos = new LinkedList<>();
responseSizes = new LinkedList<>();
pendingBulkShardRequests = new LinkedList<>();
return new ShardFollowNodeTask(
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), followTask, scheduler, System::nanoTime) {
@ -1185,6 +1237,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
response.setGlobalCheckpoint(followerGlobalCheckpoint);
response.setMaxSeqNo(followerGlobalCheckpoint);
handler.accept(response);
} else {
pendingBulkShardRequests.add(ActionListener.wrap(handler::accept, errorHandler));
}
}