From ef5c397c0f68649117731867cce59ef078145050 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 7 Jul 2020 22:03:31 -0400 Subject: [PATCH] Sending operations concurrently in peer recovery (#58018) Today, we send operations in phase2 of peer recoveries batch by batch sequentially. Normally that's okay as we should have a fairly small of operations in phase 2 due to the file-based threshold. However, if phase1 takes a lot of time and we are actively indexing, then phase2 can have a lot of operations to replay. With this change, we will send multiple batches concurrently (defaults to 1) to reduce the recovery time. Backport of #58018 --- .../modules/indices/recovery.asciidoc | 10 + .../common/settings/ClusterSettings.java | 1 + ...eTransfer.java => MultiChunkTransfer.java} | 79 +++--- .../recovery/PeerRecoverySourceService.java | 4 +- .../indices/recovery/RecoverySettings.java | 18 ++ .../recovery/RecoverySourceHandler.java | 224 +++++++++--------- .../recovery/RecoverySourceHandlerTests.java | 116 +++++++-- .../index/shard/IndexShardTestCase.java | 6 +- .../elasticsearch/test/ESIntegTestCase.java | 5 +- .../test/InternalTestCluster.java | 8 + .../xpack/ccr/repository/CcrRepository.java | 8 +- 11 files changed, 311 insertions(+), 168 deletions(-) rename server/src/main/java/org/elasticsearch/indices/recovery/{MultiFileTransfer.java => MultiChunkTransfer.java} (74%) diff --git a/docs/reference/modules/indices/recovery.asciidoc b/docs/reference/modules/indices/recovery.asciidoc index 361c9ef1f09..01a6eaa8ed0 100644 --- a/docs/reference/modules/indices/recovery.asciidoc +++ b/docs/reference/modules/indices/recovery.asciidoc @@ -48,3 +48,13 @@ sent in parallel for each recovery. Defaults to `2`. + You can increase the value of this setting when the recovery of a single shard is not reaching the traffic limit set by `indices.recovery.max_bytes_per_sec`. + +`indices.recovery.max_concurrent_operations`:: +(<>, Expert) Number of operations sent +in parallel for each recovery. Defaults to `1`. ++ +Concurrently replaying operations during recovery can be very resource-intensive +and may interfere with indexing, search, and other activities in your cluster. +Do not increase this setting without carefully verifying that your cluster has +the resources available to handle the extra load that will result. + diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 2eef552cfd4..4a204f607bd 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -236,6 +236,7 @@ public final class ClusterSettings extends AbstractScopedSettings { RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, + RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java b/server/src/main/java/org/elasticsearch/indices/recovery/MultiChunkTransfer.java similarity index 74% rename from server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java rename to server/src/main/java/org/elasticsearch/indices/recovery/MultiChunkTransfer.java index 3a544ba2777..99f2565a437 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/MultiChunkTransfer.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.seqno.LocalCheckpointTracker; -import org.elasticsearch.index.store.StoreFileMetadata; import java.io.Closeable; import java.io.IOException; @@ -57,64 +56,64 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; * one of the networking threads which receive/handle the responses of the current pending file chunk requests. This process will continue * until all chunk requests are sent/responded. */ -public abstract class MultiFileTransfer implements Closeable { +public abstract class MultiChunkTransfer implements Closeable { private Status status = Status.PROCESSING; private final Logger logger; private final ActionListener listener; private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); - private final AsyncIOProcessor processor; - private final int maxConcurrentFileChunks; - private StoreFileMetadata currentFile = null; - private final Iterator remainingFiles; - private Tuple readAheadRequest = null; + private final AsyncIOProcessor> processor; + private final int maxConcurrentChunks; + private Source currentSource = null; + private final Iterator remainingSources; + private Tuple readAheadRequest = null; - protected MultiFileTransfer(Logger logger, ThreadContext threadContext, ActionListener listener, - int maxConcurrentFileChunks, List files) { + protected MultiChunkTransfer(Logger logger, ThreadContext threadContext, ActionListener listener, + int maxConcurrentChunks, List sources) { this.logger = logger; - this.maxConcurrentFileChunks = maxConcurrentFileChunks; + this.maxConcurrentChunks = maxConcurrentChunks; this.listener = listener; - this.processor = new AsyncIOProcessor(logger, maxConcurrentFileChunks, threadContext) { + this.processor = new AsyncIOProcessor>(logger, maxConcurrentChunks, threadContext) { @Override - protected void write(List>> items) { + protected void write(List, Consumer>> items) throws IOException { handleItems(items); } }; - this.remainingFiles = files.iterator(); + this.remainingSources = sources.iterator(); } public final void start() { addItem(UNASSIGNED_SEQ_NO, null, null); // put a dummy item to start the processor } - private void addItem(long requestSeqId, StoreFileMetadata md, Exception failure) { - processor.put(new FileChunkResponseItem(requestSeqId, md, failure), e -> { assert e == null : e; }); + private void addItem(long requestSeqId, Source resource, Exception failure) { + processor.put(new FileChunkResponseItem<>(requestSeqId, resource, failure), e -> { assert e == null : e; }); } - private void handleItems(List>> items) { + private void handleItems(List, Consumer>> items) { if (status != Status.PROCESSING) { assert status == Status.FAILED : "must not receive any response after the transfer was completed"; // These exceptions will be ignored as we record only the first failure, log them for debugging purpose. items.stream().filter(item -> item.v1().failure != null).forEach(item -> - logger.debug(new ParameterizedMessage("failed to transfer a file chunk request {}", item.v1().md), item.v1().failure)); + logger.debug(new ParameterizedMessage("failed to transfer a chunk request {}", item.v1().source), item.v1().failure)); return; } try { - for (Tuple> item : items) { - final FileChunkResponseItem resp = item.v1(); + for (Tuple, Consumer> item : items) { + final FileChunkResponseItem resp = item.v1(); if (resp.requestSeqId == UNASSIGNED_SEQ_NO) { continue; // not an actual item } requestSeqIdTracker.markSeqNoAsProcessed(resp.requestSeqId); if (resp.failure != null) { - handleError(resp.md, resp.failure); + handleError(resp.source, resp.failure); throw resp.failure; } } - while (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getProcessedCheckpoint() < maxConcurrentFileChunks) { - final Tuple request = readAheadRequest != null ? readAheadRequest : getNextRequest(); + while (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getProcessedCheckpoint() < maxConcurrentChunks) { + final Tuple request = readAheadRequest != null ? readAheadRequest : getNextRequest(); readAheadRequest = null; if (request == null) { - assert currentFile == null && remainingFiles.hasNext() == false; + assert currentSource == null && remainingSources.hasNext() == false; if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getProcessedCheckpoint()) { onCompleted(null); } @@ -149,48 +148,50 @@ public abstract class MultiFileTransfer getNextRequest() throws Exception { + private Tuple getNextRequest() throws Exception { try { - if (currentFile == null) { - if (remainingFiles.hasNext()) { - currentFile = remainingFiles.next(); - onNewFile(currentFile); + if (currentSource == null) { + if (remainingSources.hasNext()) { + currentSource = remainingSources.next(); + onNewResource(currentSource); } else { return null; } } - final StoreFileMetadata md = currentFile; + final Source md = currentSource; final Request request = nextChunkRequest(md); if (request.lastChunk()) { - currentFile = null; + currentSource = null; } return Tuple.tuple(md, request); } catch (Exception e) { - handleError(currentFile, e); + handleError(currentSource, e); throw e; } } /** - * This method is called when starting sending/requesting a new file. Subclasses should override + * This method is called when starting sending/requesting a new source. Subclasses should override * this method to reset the file offset or close the previous file and open a new file if needed. */ - protected abstract void onNewFile(StoreFileMetadata md) throws IOException; + protected void onNewResource(Source resource) throws IOException { - protected abstract Request nextChunkRequest(StoreFileMetadata md) throws IOException; + } + + protected abstract Request nextChunkRequest(Source resource) throws IOException; protected abstract void executeChunkRequest(Request request, ActionListener listener); - protected abstract void handleError(StoreFileMetadata md, Exception e) throws Exception; + protected abstract void handleError(Source resource, Exception e) throws Exception; - private static class FileChunkResponseItem { + private static class FileChunkResponseItem { final long requestSeqId; - final StoreFileMetadata md; + final Source source; final Exception failure; - FileChunkResponseItem(long requestSeqId, StoreFileMetadata md, Exception failure) { + FileChunkResponseItem(long requestSeqId, Source source, Exception failure) { this.requestSeqId = requestSeqId; - this.md = md; + this.source = source; this.failure = failure; } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index ad51dc43ebb..1ce4dde52a4 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -320,7 +320,9 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)); handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request, - Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks()); + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + recoverySettings.getMaxConcurrentFileChunks(), + recoverySettings.getMaxConcurrentOperations()); return Tuple.tuple(handler, recoveryTarget); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 9abd5dcf08f..c484b3c214b 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -45,6 +45,12 @@ public class RecoverySettings { public static final Setting INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING = Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 2, 1, 5, Property.Dynamic, Property.NodeScope); + /** + * Controls the maximum number of operation chunk requests that can be sent concurrently from the source node to the target node. + */ + public static final Setting INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING = + Setting.intSetting("indices.recovery.max_concurrent_operations", 1, 1, 4, Property.Dynamic, Property.NodeScope); + /** * how long to wait before retrying after issues cause by cluster state syncing between nodes * i.e., local node is not yet known on remote node, remote shard not yet started etc. @@ -91,6 +97,7 @@ public class RecoverySettings { private volatile ByteSizeValue maxBytesPerSec; private volatile int maxConcurrentFileChunks; + private volatile int maxConcurrentOperations; private volatile SimpleRateLimiter rateLimiter; private volatile TimeValue retryDelayStateSync; private volatile TimeValue retryDelayNetwork; @@ -104,6 +111,7 @@ public class RecoverySettings { public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings); this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings); + this.maxConcurrentOperations = INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING.get(settings); // doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes) // and we want to give the master time to remove a faulty node this.retryDelayNetwork = INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.get(settings); @@ -125,6 +133,8 @@ public class RecoverySettings { clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, + this::setMaxConcurrentOperations); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, this::setRetryDelayNetwork); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, this::setInternalActionTimeout); @@ -209,4 +219,12 @@ public class RecoverySettings { private void setMaxConcurrentFileChunks(int maxConcurrentFileChunks) { this.maxConcurrentFileChunks = maxConcurrentFileChunks; } + + public int getMaxConcurrentOperations() { + return maxConcurrentOperations; + } + + private void setMaxConcurrentOperations(int maxConcurrentOperations) { + this.maxConcurrentOperations = maxConcurrentOperations; + } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 4a5251d6d78..34f3dee8634 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -40,7 +40,6 @@ import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -53,7 +52,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; @@ -87,6 +85,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.IntSupplier; import java.util.stream.StreamSupport; @@ -113,13 +112,15 @@ public class RecoverySourceHandler { private final int chunkSizeInBytes; private final RecoveryTargetHandler recoveryTarget; private final int maxConcurrentFileChunks; + private final int maxConcurrentOperations; private final ThreadPool threadPool; private final CancellableThreads cancellableThreads = new CancellableThreads(); private final List resources = new CopyOnWriteArrayList<>(); private final ListenableFuture future = new ListenableFuture<>(); public RecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, ThreadPool threadPool, - StartRecoveryRequest request, int fileChunkSizeInBytes, int maxConcurrentFileChunks) { + StartRecoveryRequest request, int fileChunkSizeInBytes, int maxConcurrentFileChunks, + int maxConcurrentOperations) { this.shard = shard; this.recoveryTarget = recoveryTarget; this.threadPool = threadPool; @@ -129,6 +130,7 @@ public class RecoverySourceHandler { this.chunkSizeInBytes = fileChunkSizeInBytes; // if the target is on an old version, it won't be able to handle out-of-order file chunks. this.maxConcurrentFileChunks = request.targetNode().getVersion().onOrAfter(Version.V_6_7_0) ? maxConcurrentFileChunks : 1; + this.maxConcurrentOperations = maxConcurrentOperations; } public StartRecoveryRequest getRequest() { @@ -321,12 +323,6 @@ public class RecoverySourceHandler { final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion(); phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersionOnPrimary, sendSnapshotStep); - sendSnapshotStep.whenComplete( - r -> IOUtils.close(phase2Snapshot), - e -> { - IOUtils.closeWhileHandlingException(phase2Snapshot); - onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)); - }); }, onFailure); @@ -341,7 +337,7 @@ public class RecoverySourceHandler { final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize, sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, - prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()); + prepareEngineStep.result().millis(), sendSnapshotResult.sentOperations, sendSnapshotResult.tookTime.millis()); try { future.onResponse(response); } finally { @@ -668,106 +664,122 @@ public class RecoverySourceHandler { throw new IndexShardClosedException(request.shardId()); } logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]"); - - final AtomicInteger skippedOps = new AtomicInteger(); - final AtomicInteger totalSentOps = new AtomicInteger(); - final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch. - final CheckedSupplier, IOException> readNextBatch = () -> { - // We need to synchronized Snapshot#next() because it's called by different threads through sendBatch. - // Even though those calls are not concurrent, Snapshot#next() uses non-synchronized state and is not multi-thread-compatible. - synchronized (snapshot) { - final List ops = lastBatchCount.get() > 0 ? new ArrayList<>(lastBatchCount.get()) : new ArrayList<>(); - long batchSizeInBytes = 0L; - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - if (shard.state() == IndexShardState.CLOSED) { - throw new IndexShardClosedException(request.shardId()); - } - cancellableThreads.checkForCancel(); - final long seqNo = operation.seqNo(); - if (seqNo < startingSeqNo || seqNo > endingSeqNo) { - skippedOps.incrementAndGet(); - continue; - } - ops.add(operation); - batchSizeInBytes += operation.estimateSize(); - totalSentOps.incrementAndGet(); - - // check if this request is past bytes threshold, and if so, send it off - if (batchSizeInBytes >= chunkSizeInBytes) { - break; - } - } - lastBatchCount.set(ops.size()); - return ops; - } - }; - final StopWatch stopWatch = new StopWatch().start(); - final ActionListener batchedListener = ActionListener.map(listener, - targetLocalCheckpoint -> { - assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get() + final StepListener sendListener = new StepListener<>(); + final OperationBatchSender sender = new OperationBatchSender(startingSeqNo, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersion, sendListener); + sendListener.whenComplete( + ignored -> { + final long skippedOps = sender.skippedOps.get(); + final int totalSentOps = sender.sentOps.get(); + final long targetLocalCheckpoint = sender.targetLocalCheckpoint.get(); + assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps + totalSentOps : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", - snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get()); + snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps, totalSentOps); stopWatch.stop(); final TimeValue tookTime = stopWatch.totalTime(); logger.trace("recovery [phase2]: took [{}]", tookTime); - return new SendSnapshotResult(targetLocalCheckpoint, totalSentOps.get(), tookTime); - } - ); + listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps, tookTime)); + }, listener::onFailure); + sender.start(); + } - sendBatch( - readNextBatch, - true, - SequenceNumbers.UNASSIGNED_SEQ_NO, + private static class OperationChunkRequest implements MultiChunkTransfer.ChunkRequest { + final List operations; + final boolean lastChunk; + + OperationChunkRequest(List operations, boolean lastChunk) { + this.operations = operations; + this.lastChunk = lastChunk; + } + + @Override + public boolean lastChunk() { + return lastChunk; + } + } + + private class OperationBatchSender extends MultiChunkTransfer { + private final long startingSeqNo; + private final long endingSeqNo; + private final Translog.Snapshot snapshot; + private final long maxSeenAutoIdTimestamp; + private final long maxSeqNoOfUpdatesOrDeletes; + private final RetentionLeases retentionLeases; + private final long mappingVersion; + private int lastBatchCount = 0; // used to estimate the count of the subsequent batch. + private final AtomicInteger skippedOps = new AtomicInteger(); + private final AtomicInteger sentOps = new AtomicInteger(); + private final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + OperationBatchSender(long startingSeqNo, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases, long mappingVersion, + ActionListener listener) { + super(logger, threadPool.getThreadContext(), listener, maxConcurrentOperations, Collections.singletonList(snapshot)); + this.startingSeqNo = startingSeqNo; + this.endingSeqNo = endingSeqNo; + this.snapshot = snapshot; + this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp; + this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; + this.retentionLeases = retentionLeases; + this.mappingVersion = mappingVersion; + } + + @Override + protected synchronized OperationChunkRequest nextChunkRequest(Translog.Snapshot snapshot) throws IOException { + // We need to synchronized Snapshot#next() because it's called by different threads through sendBatch. + // Even though those calls are not concurrent, Snapshot#next() uses non-synchronized state and is not multi-thread-compatible. + assert Transports.assertNotTransportThread("[phase2]"); + cancellableThreads.checkForCancel(); + final List ops = lastBatchCount > 0 ? new ArrayList<>(lastBatchCount) : new ArrayList<>(); + long batchSizeInBytes = 0L; + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); + } + final long seqNo = operation.seqNo(); + if (seqNo < startingSeqNo || seqNo > endingSeqNo) { + skippedOps.incrementAndGet(); + continue; + } + ops.add(operation); + batchSizeInBytes += operation.estimateSize(); + sentOps.incrementAndGet(); + + // check if this request is past bytes threshold, and if so, send it off + if (batchSizeInBytes >= chunkSizeInBytes) { + break; + } + } + lastBatchCount = ops.size(); + return new OperationChunkRequest(ops, operation == null); + } + + @Override + protected void executeChunkRequest(OperationChunkRequest request, ActionListener listener) { + cancellableThreads.checkForCancel(); + recoveryTarget.indexTranslogOperations( + request.operations, snapshot.totalOperations(), maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersion, - batchedListener); - } + ActionListener.delegateFailure(listener, (l, newCheckpoint) -> { + targetLocalCheckpoint.updateAndGet(curr -> SequenceNumbers.max(curr, newCheckpoint)); + l.onResponse(null); + })); + } - private void sendBatch( - final CheckedSupplier, IOException> nextBatch, - final boolean firstBatch, - final long targetLocalCheckpoint, - final int totalTranslogOps, - final long maxSeenAutoIdTimestamp, - final long maxSeqNoOfUpdatesOrDeletes, - final RetentionLeases retentionLeases, - final long mappingVersionOnPrimary, - final ActionListener listener) throws IOException { - assert ThreadPool.assertCurrentMethodIsNotCalledRecursively(); - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[send translog]"); - final List operations = nextBatch.get(); - // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint - if (operations.isEmpty() == false || firstBatch) { - cancellableThreads.checkForCancel(); - recoveryTarget.indexTranslogOperations( - operations, - totalTranslogOps, - maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, - retentionLeases, - mappingVersionOnPrimary, - ActionListener.wrap( - newCheckpoint -> { - sendBatch( - nextBatch, - false, - SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint), - totalTranslogOps, - maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, - retentionLeases, - mappingVersionOnPrimary, - listener); - }, - listener::onFailure - )); - } else { - listener.onResponse(targetLocalCheckpoint); + @Override + protected void handleError(Translog.Snapshot snapshot, Exception e) { + throw new RecoveryEngineException(shard.shardId(), 2, "failed to send/replay operations", e); + } + + @Override + public void close() throws IOException { + snapshot.close(); } } @@ -812,12 +824,12 @@ public class RecoverySourceHandler { static final class SendSnapshotResult { final long targetLocalCheckpoint; - final int totalOperations; + final int sentOperations; final TimeValue tookTime; - SendSnapshotResult(final long targetLocalCheckpoint, final int totalOperations, final TimeValue tookTime) { + SendSnapshotResult(final long targetLocalCheckpoint, final int sentOperations, final TimeValue tookTime) { this.targetLocalCheckpoint = targetLocalCheckpoint; - this.totalOperations = totalOperations; + this.sentOperations = sentOperations; this.tookTime = tookTime; } } @@ -839,7 +851,7 @@ public class RecoverySourceHandler { '}'; } - private static class FileChunk implements MultiFileTransfer.ChunkRequest, Releasable { + private static class FileChunk implements MultiChunkTransfer.ChunkRequest, Releasable { final StoreFileMetadata md; final BytesReference content; final long position; @@ -867,16 +879,16 @@ public class RecoverySourceHandler { void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener listener) { ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length)); // send smallest first - final ThreadContext threadContext = threadPool.getThreadContext(); - final MultiFileTransfer multiFileSender = - new MultiFileTransfer(logger, threadContext, listener, maxConcurrentFileChunks, Arrays.asList(files)) { + + final MultiChunkTransfermultiFileSender = new MultiChunkTransfer( + logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) { final Deque buffers = new ConcurrentLinkedDeque<>(); InputStreamIndexInput currentInput = null; long offset = 0; @Override - protected void onNewFile(StoreFileMetadata md) throws IOException { + protected void onNewResource(StoreFileMetadata md) throws IOException { offset = 0; IOUtils.close(currentInput, () -> currentInput = null); final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 18177b41734..fb5433ab8ee 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Numbers; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -53,10 +54,12 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.ParseContext; @@ -88,14 +91,18 @@ import org.junit.Before; import java.io.IOException; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -180,7 +187,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { } }; RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), - threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5)); + threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5), between(1, 5)); PlainActionFuture sendFilesFuture = new PlainActionFuture<>(); handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture); sendFilesFuture.actionGet(); @@ -241,13 +248,13 @@ public class RecoverySourceHandlerTests extends ESTestCase { } }; RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), - threadPool, request, fileChunkSizeInBytes, between(1, 10)); + threadPool, request, fileChunkSizeInBytes, between(1, 10), between(1, 10)); PlainActionFuture future = new PlainActionFuture<>(); handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, randomNonNegativeLong(), future); final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); RecoverySourceHandler.SendSnapshotResult result = future.actionGet(); - assertThat(result.totalOperations, equalTo(expectedOps)); + assertThat(result.sentOperations, equalTo(expectedOps)); shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo)); assertThat(shippedOps.size(), equalTo(expectedOps)); for (int i = 0; i < shippedOps.size(); i++) { @@ -281,17 +288,76 @@ public class RecoverySourceHandlerTests extends ESTestCase { } }; RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), - threadPool, request, fileChunkSizeInBytes, between(1, 10)); + threadPool, request, fileChunkSizeInBytes, between(1, 10), between(1, 10)); PlainActionFuture future = new PlainActionFuture<>(); final long startingSeqNo = randomLongBetween(0, ops.size() - 1L); final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L); handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, randomNonNegativeLong(), future); if (wasFailed.get()) { - assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index")); + final RecoveryEngineException error = expectThrows(RecoveryEngineException.class, future::actionGet); + assertThat(error.getMessage(), equalTo("Phase[2] failed to send/replay operations")); + assertThat(error.getCause().getMessage(), equalTo("test - failed to index")); } } + public void testSendOperationsConcurrently() throws Throwable { + final IndexShard shard = mock(IndexShard.class); + when(shard.state()).thenReturn(IndexShardState.STARTED); + Set receivedSeqNos = ConcurrentCollections.newConcurrentSet(); + long maxSeenAutoIdTimestamp = randomBoolean() ? -1 : randomNonNegativeLong(); + long maxSeqNoOfUpdatesOrDeletes = randomBoolean() ? -1 : randomNonNegativeLong(); + RetentionLeases retentionLeases = new RetentionLeases(randomNonNegativeLong(), randomNonNegativeLong(), Collections.emptySet()); + long mappingVersion = randomNonNegativeLong(); + AtomicLong localCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + int numOps = randomIntBetween(0, 1000); + AtomicBoolean received = new AtomicBoolean(); + RecoveryTargetHandler target = new TestRecoveryTargetHandler() { + @Override + public void indexTranslogOperations(List operations, int receivedTotalOps, + long receivedMaxSeenAutoIdTimestamp, long receivedMaxSeqNoOfUpdatesOrDeletes, + RetentionLeases receivedRetentionLease, long receivedMappingVersion, + ActionListener listener) { + received.set(true); + assertThat(receivedMaxSeenAutoIdTimestamp, equalTo(maxSeenAutoIdTimestamp)); + assertThat(receivedMaxSeqNoOfUpdatesOrDeletes, equalTo(maxSeqNoOfUpdatesOrDeletes)); + assertThat(receivedRetentionLease, equalTo(retentionLeases)); + assertThat(receivedMappingVersion, equalTo(mappingVersion)); + assertThat(receivedTotalOps, equalTo(numOps)); + for (Translog.Operation operation : operations) { + receivedSeqNos.add(operation.seqNo()); + } + if (randomBoolean()) { + localCheckpoint.addAndGet(randomIntBetween(1, 100)); + } + listener.onResponse(localCheckpoint.get()); + } + }; + + PlainActionFuture sendFuture = new PlainActionFuture<>(); + long startingSeqNo = randomIntBetween(0, 1000); + long endingSeqNo = startingSeqNo + randomIntBetween(0, 10000); + List operations = generateOperations(numOps); + Randomness.shuffle(operations); + List skipOperations = randomSubsetOf(operations); + Translog.Snapshot snapshot = newTranslogSnapshot(operations, skipOperations); + RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(target, recoveryExecutor), + threadPool, getStartRecoveryRequest(), between(1, 10 * 1024), between(1, 5), between(1, 5)); + handler.phase2(startingSeqNo, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, + mappingVersion, sendFuture); + RecoverySourceHandler.SendSnapshotResult sendSnapshotResult = sendFuture.actionGet(); + assertTrue(received.get()); + assertThat(sendSnapshotResult.targetLocalCheckpoint, equalTo(localCheckpoint.get())); + assertThat(sendSnapshotResult.sentOperations, equalTo(receivedSeqNos.size())); + Set sentSeqNos = new HashSet<>(); + for (Translog.Operation op : operations) { + if (startingSeqNo <= op.seqNo() && op.seqNo() <= endingSeqNo && skipOperations.contains(op) == false) { + sentSeqNos.add(op.seqNo()); + } + } + assertThat(receivedSeqNos, equalTo(sentSeqNos)); + } + private Engine.Index getIndex(final String id) { final String type = "test"; final ParseContext.Document document = new ParseContext.Document(); @@ -352,7 +418,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { } }; RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, - request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8)) { + request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8), between(1, 8)) { @Override protected void failEngine(IOException cause) { assertFalse(failedEngine.get()); @@ -409,7 +475,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { } }; RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, - request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10)) { + request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10), between(1, 4)) { @Override protected void failEngine(IOException cause) { assertFalse(failedEngine.get()); @@ -464,7 +530,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), - between(1, 8)) { + between(1, 8), between(1, 8)) { @Override void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener listener) { @@ -541,7 +607,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { final int maxConcurrentChunks = between(1, 8); final int chunkSize = between(1, 32); final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, threadPool, getStartRecoveryRequest(), - chunkSize, maxConcurrentChunks); + chunkSize, maxConcurrentChunks, between(1, 10)); Store store = newStore(createTempDir(), false); List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); @@ -599,7 +665,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { final int maxConcurrentChunks = between(1, 4); final int chunkSize = between(1, 16); final RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(recoveryTarget, recoveryExecutor), - threadPool, getStartRecoveryRequest(), chunkSize, maxConcurrentChunks); + threadPool, getStartRecoveryRequest(), chunkSize, maxConcurrentChunks, between(1, 5)); Store store = newStore(createTempDir(), false); List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); @@ -680,7 +746,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { }; final StartRecoveryRequest startRecoveryRequest = getStartRecoveryRequest(); final RecoverySourceHandler handler = new RecoverySourceHandler( - shard, recoveryTarget, threadPool, startRecoveryRequest, between(1, 16), between(1, 4)) { + shard, recoveryTarget, threadPool, startRecoveryRequest, between(1, 16), between(1, 4), between(1, 4)) { @Override void createRetentionLease(long startingSeqNo, ActionListener listener) { final String leaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(startRecoveryRequest.targetNode().getId()); @@ -709,7 +775,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); RecoverySourceHandler handler = new RecoverySourceHandler( - shard, new TestRecoveryTargetHandler(), threadPool, getStartRecoveryRequest(), between(1, 16), between(1, 4)); + shard, new TestRecoveryTargetHandler(), threadPool, getStartRecoveryRequest(), between(1, 16), between(1, 4), between(1, 4)); String syncId = UUIDs.randomBase64UUID(); int numDocs = between(0, 1000); @@ -824,8 +890,8 @@ public class RecoverySourceHandlerTests extends ESTestCase { } private Translog.Snapshot newTranslogSnapshot(List operations, List operationsToSkip) { + Iterator iterator = operations.iterator(); return new Translog.Snapshot() { - int index = 0; int skippedCount = 0; @Override @@ -840,8 +906,8 @@ public class RecoverySourceHandlerTests extends ESTestCase { @Override public Translog.Operation next() { - while (index < operations.size()) { - Translog.Operation op = operations.get(index++); + while (iterator.hasNext()) { + Translog.Operation op = iterator.next(); if (operationsToSkip.contains(op)) { skippedCount++; } else { @@ -857,4 +923,24 @@ public class RecoverySourceHandlerTests extends ESTestCase { } }; } + + private static List generateOperations(int numOps) { + final List operations = new ArrayList<>(numOps); + final byte[] source = "{}".getBytes(StandardCharsets.UTF_8); + final Set seqNos = new HashSet<>(); + for (int i = 0; i < numOps; i++) { + final long seqNo = randomValueOtherThanMany(n -> seqNos.add(n) == false, ESTestCase::randomNonNegativeLong); + final Translog.Operation op; + if (randomBoolean()) { + op = new Translog.Index("_doc", "id", seqNo, randomNonNegativeLong(), randomNonNegativeLong(), source, null, -1); + } else if (randomBoolean()) { + op = new Translog.Delete("_doc", "id", new Term("_id", Uid.encodeId("id")), + seqNo, randomNonNegativeLong(), randomNonNegativeLong()); + } else { + op = new Translog.NoOp(seqNo, randomNonNegativeLong(), "test"); + } + operations.add(op); + } + return operations; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 7d1bb2a0901..ab97684f19f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -42,7 +42,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; @@ -73,6 +72,7 @@ import org.elasticsearch.indices.recovery.AsyncRecoveryTarget; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryResponse; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySourceHandler; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; @@ -634,9 +634,11 @@ public abstract class IndexShardTestCase extends ESTestCase { final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint(); final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest( logger, rNode, recoveryTarget, startingSeqNo); + int fileChunkSizeInBytes = Math.toIntExact( + randomBoolean() ? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes() : randomIntBetween(1, 10 * 1024 * 1024)); final RecoverySourceHandler recovery = new RecoverySourceHandler(primary, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool, - request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8)); + request, fileChunkSizeInBytes, between(1, 8), between(1, 8)); primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable); try { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 62d6c31b86d..694807e4017 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -129,6 +129,7 @@ import org.elasticsearch.indices.IndicesRequestCache; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.node.NodeMocksPlugin; +import org.elasticsearch.node.RecoverySettingsChunkSizePlugin; import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; @@ -2051,7 +2052,9 @@ public abstract class ESIntegTestCase extends ESTestCase { mocks.add(MockFieldFilterPlugin.class); } } - + if (randomBoolean()) { + mocks.add(RecoverySettingsChunkSizePlugin.class); + } if (addMockTransportService()) { mocks.add(getTestTransportPlugin()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 3b4e8dd660a..ae2f59523f9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -106,6 +106,7 @@ import org.elasticsearch.node.MockNode; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeService; import org.elasticsearch.node.NodeValidationException; +import org.elasticsearch.node.RecoverySettingsChunkSizePlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.script.ScriptService; @@ -163,6 +164,7 @@ import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_M import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.getTestTransportType; import static org.elasticsearch.test.ESTestCase.inFipsJvm; +import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.NodeRoles.dataOnlyNode; import static org.elasticsearch.test.NodeRoles.masterOnlyNode; @@ -405,6 +407,12 @@ public final class InternalTestCluster extends TestCluster { RandomNumbers.randomIntBetween(random, 20, 50))); builder.put(RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.getKey(), RandomNumbers.randomIntBetween(random, 1, 5)); + builder.put(RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING.getKey(), + RandomNumbers.randomIntBetween(random, 1, 4)); + if (mockPlugins.contains(RecoverySettingsChunkSizePlugin.class) && randomBoolean()) { + builder.put(RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING.getKey(), + new ByteSizeValue(RandomNumbers.randomIntBetween(random, 256, 10 * 1024 * 1024))); + } defaultSettings = builder.build(); executor = EsExecutors.newScaling("internal_test_cluster_executor", 0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY)); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 866803d8a87..755683bf651 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -57,7 +57,7 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.F import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetadata; -import org.elasticsearch.indices.recovery.MultiFileTransfer; +import org.elasticsearch.indices.recovery.MultiChunkTransfer; import org.elasticsearch.indices.recovery.MultiFileWriter; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; @@ -517,7 +517,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit protected void restoreFiles(List filesToRecover, Store store, ActionListener allFilesListener) { logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover); final List mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList()); - final MultiFileTransfer multiFileTransfer = new MultiFileTransfer( + final MultiChunkTransfer multiFileTransfer = new MultiChunkTransfer( logger, threadPool.getThreadContext(), allFilesListener, ccrSettings.getMaxConcurrentFileChunks(), mds) { final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> { @@ -525,7 +525,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit long offset = 0; @Override - protected void onNewFile(StoreFileMetadata md) { + protected void onNewResource(StoreFileMetadata md) { offset = 0; } @@ -596,7 +596,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout()); } - private static class FileChunk implements MultiFileTransfer.ChunkRequest { + private static class FileChunk implements MultiChunkTransfer.ChunkRequest { final StoreFileMetadata md; final int bytesRequested; final boolean lastChunk;