Make recovery source send operations non-blocking (#37503)
Relates #37458
@ -33,9 +33,9 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
@ -71,7 +71,7 @@ import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -514,13 +514,6 @@ public class RecoverySourceHandler {
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<SendSnapshotResult> listener) throws IOException {
ActionListener.completeWith(listener, () -> sendSnapshotBlockingly(
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes));
private SendSnapshotResult sendSnapshotBlockingly(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
@ -528,83 +521,87 @@ public class RecoverySourceHandler {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
final StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " +
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");
int ops = 0;
long size = 0;
int skippedOps = 0;
int totalSentOps = 0;
final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final List<Translog.Operation> operations = new ArrayList<>();
final AtomicInteger skippedOps = new AtomicInteger();
final AtomicInteger totalSentOps = new AtomicInteger();
final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1);
final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch.
final CheckedSupplier<List<Translog.Operation>, IOException> readNextBatch = () -> {
// We need to synchronized Snapshot#next() because it's called by different threads through sendBatch.
// Even though those calls are not concurrent, Snapshot#next() uses non-synchronized state and is not multi-thread-compatible.
synchronized (snapshot) {
final List<Translog.Operation> ops = lastBatchCount.get() > 0 ? new ArrayList<>(lastBatchCount.get()) : new ArrayList<>();
long batchSizeInBytes = 0L;
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
final long seqNo = operation.seqNo();
if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
batchSizeInBytes += operation.estimateSize();
final int expectedTotalOps = snapshot.totalOperations();
if (expectedTotalOps == 0) {
logger.trace("no translog operations to send");
final CancellableThreads.IOInterruptible sendBatch = () -> {
// TODO: Make this non-blocking
final PlainActionFuture<Long> future = new PlainActionFuture<>();
operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, future);
// check if this request is past bytes threshold, and if so, send it off
if (batchSizeInBytes >= chunkSizeInBytes) {
return ops;
// send operations in batches
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
final StopWatch stopWatch = new StopWatch().start();
final ActionListener<Long> batchedListener = ActionListener.wrap(
targetLocalCheckpoint -> {
assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get());
if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase2]: took [{}]", tookTime);
listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps.get(), tookTime));
final long seqNo = operation.seqNo();
if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
size += operation.estimateSize();
sendBatch(readNextBatch, true, SequenceNumbers.UNASSIGNED_SEQ_NO, snapshot.totalOperations(),
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, batchedListener);
// check if this request is past bytes threshold, and if so, send it off
if (size >= chunkSizeInBytes) {
logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);
ops = 0;
size = 0;
private void sendBatch(CheckedSupplier<List<Translog.Operation>, IOException> nextBatch, boolean firstBatch,
long targetLocalCheckpoint, int totalTranslogOps, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<Long> listener) throws IOException {
final List<Translog.Operation> operations = nextBatch.get();
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
if (operations.isEmpty() == false || firstBatch) {
cancellableThreads.execute(() -> {
recoveryTarget.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
newCheckpoint -> {
sendBatch(nextBatch, false, SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener);
} else {
if (!operations.isEmpty() || totalSentOps == 0) {
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
assert expectedTotalOps == snapshot.skippedOperations() + skippedOps + totalSentOps
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
expectedTotalOps, snapshot.skippedOperations(), skippedOps, totalSentOps);
if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase2]: took [{}]", tookTime);
return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps, tookTime);
void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener<Void> listener) throws IOException {
@ -76,6 +76,10 @@ import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.io.OutputStream;
@ -115,6 +119,18 @@ public class RecoverySourceHandlerTests extends ESTestCase {
private final ShardId shardId = new ShardId(INDEX_SETTINGS.getIndex(), 1);
private final ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private ThreadPool threadPool;
public void setUpThreadPool() {
threadPool = new TestThreadPool(getTestName());
public void tearDownThreadPool() {
public void testSendFiles() throws Throwable {
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
put("indices.recovery.concurrent_small_file_streams", 1).build();
@ -198,18 +214,17 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public void testSendSnapshotSendsOps() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
final int fileChunkSizeInBytes = between(1, 4096);
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
final List<Translog.Operation> operations = new ArrayList<>();
final int initialNumberOfDocs = randomIntBetween(16, 64);
final int initialNumberOfDocs = randomIntBetween(10, 1000);
for (int i = 0; i < initialNumberOfDocs; i++) {
final Engine.Index index = getIndex(Integer.toString(i));
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, SequenceNumbers.UNASSIGNED_SEQ_NO, true)));
final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(16, 64);
final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(10, 1000);
for (int i = initialNumberOfDocs; i < initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers; i++) {
final Engine.Index index = getIndex(Integer.toString(i));
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true)));
@ -219,12 +234,14 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1);
final List<Translog.Operation> shippedOps = new ArrayList<>();
final AtomicLong checkpointOnTarget = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu,
ActionListener<Long> listener) {
checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE));
maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get()));
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
@ -239,6 +256,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
for (int i = 0; i < shippedOps.size(); i++) {
assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs)));
assertThat(result.targetLocalCheckpoint, equalTo(checkpointOnTarget.get()));
if (endingSeqNo >= requiredStartingSeqNo + 1) {
// check that missing ops blows up
List<Translog.Operation> requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker
@ -253,6 +271,40 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public void testSendSnapshotStopOnError() throws Exception {
final int fileChunkSizeInBytes = between(1, 10 * 1024);
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
final List<Translog.Operation> ops = new ArrayList<>();
for (int numOps = between(1, 256), i = 0; i < numOps; i++) {
final Engine.Index index = getIndex(Integer.toString(i));
ops.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i, true)));
final AtomicBoolean wasFailed = new AtomicBoolean();
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp,
long msu, ActionListener<Long> listener) {
if (randomBoolean()) {
maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED));
} else {
maybeExecuteAsync(() -> listener.onFailure(new RuntimeException("test - failed to index")));
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
handler.phase2(startingSeqNo, startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()),
randomNonNegativeLong(), randomNonNegativeLong(), future);
if (wasFailed.get()) {
assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index"));
private Engine.Index getIndex(final String id) {
final String type = "test";
final ParseContext.Document document = new ParseContext.Document();
@ -717,4 +769,12 @@ public class RecoverySourceHandlerTests extends ESTestCase {
private void maybeExecuteAsync(Runnable runnable) {
if (randomBoolean()) {
} else {
