Sending operations concurrently in peer recovery (#58018)
Today, we send operations in phase2 of peer recoveries batch by batch sequentially. Normally that's okay as we should have a fairly small of operations in phase 2 due to the file-based threshold. However, if phase1 takes a lot of time and we are actively indexing, then phase2 can have a lot of operations to replay. With this change, we will send multiple batches concurrently (defaults to 1) to reduce the recovery time. Backport of #58018
This commit is contained in:
parent
01fb7272bb
commit
ef5c397c0f
|
@ -48,3 +48,13 @@ sent in parallel for each recovery. Defaults to `2`.
|
|||
+
|
||||
You can increase the value of this setting when the recovery of a single shard
|
||||
is not reaching the traffic limit set by `indices.recovery.max_bytes_per_sec`.
|
||||
|
||||
`indices.recovery.max_concurrent_operations`::
|
||||
(<<cluster-update-settings,Dynamic>>, Expert) Number of operations sent
|
||||
in parallel for each recovery. Defaults to `1`.
|
||||
+
|
||||
Concurrently replaying operations during recovery can be very resource-intensive
|
||||
and may interfere with indexing, search, and other activities in your cluster.
|
||||
Do not increase this setting without carefully verifying that your cluster has
|
||||
the resources available to handle the extra load that will result.
|
||||
|
||||
|
|
|
@ -236,6 +236,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING,
|
||||
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
|
||||
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
|
||||
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
|
||||
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
|
||||
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
|
||||
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
|
|||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
|
||||
import org.elasticsearch.index.store.StoreFileMetadata;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -57,64 +56,64 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
|
|||
* one of the networking threads which receive/handle the responses of the current pending file chunk requests. This process will continue
|
||||
* until all chunk requests are sent/responded.
|
||||
*/
|
||||
public abstract class MultiFileTransfer<Request extends MultiFileTransfer.ChunkRequest> implements Closeable {
|
||||
public abstract class MultiChunkTransfer<Source, Request extends MultiChunkTransfer.ChunkRequest> implements Closeable {
|
||||
private Status status = Status.PROCESSING;
|
||||
private final Logger logger;
|
||||
private final ActionListener<Void> listener;
|
||||
private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
|
||||
private final AsyncIOProcessor<FileChunkResponseItem> processor;
|
||||
private final int maxConcurrentFileChunks;
|
||||
private StoreFileMetadata currentFile = null;
|
||||
private final Iterator<StoreFileMetadata> remainingFiles;
|
||||
private Tuple<StoreFileMetadata, Request> readAheadRequest = null;
|
||||
private final AsyncIOProcessor<FileChunkResponseItem<Source>> processor;
|
||||
private final int maxConcurrentChunks;
|
||||
private Source currentSource = null;
|
||||
private final Iterator<Source> remainingSources;
|
||||
private Tuple<Source, Request> readAheadRequest = null;
|
||||
|
||||
protected MultiFileTransfer(Logger logger, ThreadContext threadContext, ActionListener<Void> listener,
|
||||
int maxConcurrentFileChunks, List<StoreFileMetadata> files) {
|
||||
protected MultiChunkTransfer(Logger logger, ThreadContext threadContext, ActionListener<Void> listener,
|
||||
int maxConcurrentChunks, List<Source> sources) {
|
||||
this.logger = logger;
|
||||
this.maxConcurrentFileChunks = maxConcurrentFileChunks;
|
||||
this.maxConcurrentChunks = maxConcurrentChunks;
|
||||
this.listener = listener;
|
||||
this.processor = new AsyncIOProcessor<FileChunkResponseItem>(logger, maxConcurrentFileChunks, threadContext) {
|
||||
this.processor = new AsyncIOProcessor<FileChunkResponseItem<Source>>(logger, maxConcurrentChunks, threadContext) {
|
||||
@Override
|
||||
protected void write(List<Tuple<FileChunkResponseItem, Consumer<Exception>>> items) {
|
||||
protected void write(List<Tuple<FileChunkResponseItem<Source>, Consumer<Exception>>> items) throws IOException {
|
||||
handleItems(items);
|
||||
}
|
||||
};
|
||||
this.remainingFiles = files.iterator();
|
||||
this.remainingSources = sources.iterator();
|
||||
}
|
||||
|
||||
public final void start() {
|
||||
addItem(UNASSIGNED_SEQ_NO, null, null); // put a dummy item to start the processor
|
||||
}
|
||||
|
||||
private void addItem(long requestSeqId, StoreFileMetadata md, Exception failure) {
|
||||
processor.put(new FileChunkResponseItem(requestSeqId, md, failure), e -> { assert e == null : e; });
|
||||
private void addItem(long requestSeqId, Source resource, Exception failure) {
|
||||
processor.put(new FileChunkResponseItem<>(requestSeqId, resource, failure), e -> { assert e == null : e; });
|
||||
}
|
||||
|
||||
private void handleItems(List<Tuple<FileChunkResponseItem, Consumer<Exception>>> items) {
|
||||
private void handleItems(List<Tuple<FileChunkResponseItem<Source>, Consumer<Exception>>> items) {
|
||||
if (status != Status.PROCESSING) {
|
||||
assert status == Status.FAILED : "must not receive any response after the transfer was completed";
|
||||
// These exceptions will be ignored as we record only the first failure, log them for debugging purpose.
|
||||
items.stream().filter(item -> item.v1().failure != null).forEach(item ->
|
||||
logger.debug(new ParameterizedMessage("failed to transfer a file chunk request {}", item.v1().md), item.v1().failure));
|
||||
logger.debug(new ParameterizedMessage("failed to transfer a chunk request {}", item.v1().source), item.v1().failure));
|
||||
return;
|
||||
}
|
||||
try {
|
||||
for (Tuple<FileChunkResponseItem, Consumer<Exception>> item : items) {
|
||||
final FileChunkResponseItem resp = item.v1();
|
||||
for (Tuple<FileChunkResponseItem<Source>, Consumer<Exception>> item : items) {
|
||||
final FileChunkResponseItem<Source> resp = item.v1();
|
||||
if (resp.requestSeqId == UNASSIGNED_SEQ_NO) {
|
||||
continue; // not an actual item
|
||||
}
|
||||
requestSeqIdTracker.markSeqNoAsProcessed(resp.requestSeqId);
|
||||
if (resp.failure != null) {
|
||||
handleError(resp.md, resp.failure);
|
||||
handleError(resp.source, resp.failure);
|
||||
throw resp.failure;
|
||||
}
|
||||
}
|
||||
while (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getProcessedCheckpoint() < maxConcurrentFileChunks) {
|
||||
final Tuple<StoreFileMetadata, Request> request = readAheadRequest != null ? readAheadRequest : getNextRequest();
|
||||
while (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getProcessedCheckpoint() < maxConcurrentChunks) {
|
||||
final Tuple<Source, Request> request = readAheadRequest != null ? readAheadRequest : getNextRequest();
|
||||
readAheadRequest = null;
|
||||
if (request == null) {
|
||||
assert currentFile == null && remainingFiles.hasNext() == false;
|
||||
assert currentSource == null && remainingSources.hasNext() == false;
|
||||
if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getProcessedCheckpoint()) {
|
||||
onCompleted(null);
|
||||
}
|
||||
|
@ -149,48 +148,50 @@ public abstract class MultiFileTransfer<Request extends MultiFileTransfer.ChunkR
|
|||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
private Tuple<StoreFileMetadata, Request> getNextRequest() throws Exception {
|
||||
private Tuple<Source, Request> getNextRequest() throws Exception {
|
||||
try {
|
||||
if (currentFile == null) {
|
||||
if (remainingFiles.hasNext()) {
|
||||
currentFile = remainingFiles.next();
|
||||
onNewFile(currentFile);
|
||||
if (currentSource == null) {
|
||||
if (remainingSources.hasNext()) {
|
||||
currentSource = remainingSources.next();
|
||||
onNewResource(currentSource);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
final StoreFileMetadata md = currentFile;
|
||||
final Source md = currentSource;
|
||||
final Request request = nextChunkRequest(md);
|
||||
if (request.lastChunk()) {
|
||||
currentFile = null;
|
||||
currentSource = null;
|
||||
}
|
||||
return Tuple.tuple(md, request);
|
||||
} catch (Exception e) {
|
||||
handleError(currentFile, e);
|
||||
handleError(currentSource, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is called when starting sending/requesting a new file. Subclasses should override
|
||||
* This method is called when starting sending/requesting a new source. Subclasses should override
|
||||
* this method to reset the file offset or close the previous file and open a new file if needed.
|
||||
*/
|
||||
protected abstract void onNewFile(StoreFileMetadata md) throws IOException;
|
||||
protected void onNewResource(Source resource) throws IOException {
|
||||
|
||||
protected abstract Request nextChunkRequest(StoreFileMetadata md) throws IOException;
|
||||
}
|
||||
|
||||
protected abstract Request nextChunkRequest(Source resource) throws IOException;
|
||||
|
||||
protected abstract void executeChunkRequest(Request request, ActionListener<Void> listener);
|
||||
|
||||
protected abstract void handleError(StoreFileMetadata md, Exception e) throws Exception;
|
||||
protected abstract void handleError(Source resource, Exception e) throws Exception;
|
||||
|
||||
private static class FileChunkResponseItem {
|
||||
private static class FileChunkResponseItem<Source> {
|
||||
final long requestSeqId;
|
||||
final StoreFileMetadata md;
|
||||
final Source source;
|
||||
final Exception failure;
|
||||
|
||||
FileChunkResponseItem(long requestSeqId, StoreFileMetadata md, Exception failure) {
|
||||
FileChunkResponseItem(long requestSeqId, Source source, Exception failure) {
|
||||
this.requestSeqId = requestSeqId;
|
||||
this.md = md;
|
||||
this.source = source;
|
||||
this.failure = failure;
|
||||
}
|
||||
}
|
|
@ -320,7 +320,9 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
|
|||
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
|
||||
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
|
||||
handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request,
|
||||
Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks());
|
||||
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
|
||||
recoverySettings.getMaxConcurrentFileChunks(),
|
||||
recoverySettings.getMaxConcurrentOperations());
|
||||
return Tuple.tuple(handler, recoveryTarget);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,12 @@ public class RecoverySettings {
|
|||
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING =
|
||||
Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 2, 1, 5, Property.Dynamic, Property.NodeScope);
|
||||
|
||||
/**
|
||||
* Controls the maximum number of operation chunk requests that can be sent concurrently from the source node to the target node.
|
||||
*/
|
||||
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING =
|
||||
Setting.intSetting("indices.recovery.max_concurrent_operations", 1, 1, 4, Property.Dynamic, Property.NodeScope);
|
||||
|
||||
/**
|
||||
* how long to wait before retrying after issues cause by cluster state syncing between nodes
|
||||
* i.e., local node is not yet known on remote node, remote shard not yet started etc.
|
||||
|
@ -91,6 +97,7 @@ public class RecoverySettings {
|
|||
|
||||
private volatile ByteSizeValue maxBytesPerSec;
|
||||
private volatile int maxConcurrentFileChunks;
|
||||
private volatile int maxConcurrentOperations;
|
||||
private volatile SimpleRateLimiter rateLimiter;
|
||||
private volatile TimeValue retryDelayStateSync;
|
||||
private volatile TimeValue retryDelayNetwork;
|
||||
|
@ -104,6 +111,7 @@ public class RecoverySettings {
|
|||
public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
|
||||
this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings);
|
||||
this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings);
|
||||
this.maxConcurrentOperations = INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING.get(settings);
|
||||
// doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes)
|
||||
// and we want to give the master time to remove a faulty node
|
||||
this.retryDelayNetwork = INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.get(settings);
|
||||
|
@ -125,6 +133,8 @@ public class RecoverySettings {
|
|||
|
||||
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
|
||||
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
|
||||
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
|
||||
this::setMaxConcurrentOperations);
|
||||
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync);
|
||||
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, this::setRetryDelayNetwork);
|
||||
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, this::setInternalActionTimeout);
|
||||
|
@ -209,4 +219,12 @@ public class RecoverySettings {
|
|||
private void setMaxConcurrentFileChunks(int maxConcurrentFileChunks) {
|
||||
this.maxConcurrentFileChunks = maxConcurrentFileChunks;
|
||||
}
|
||||
|
||||
public int getMaxConcurrentOperations() {
|
||||
return maxConcurrentOperations;
|
||||
}
|
||||
|
||||
private void setMaxConcurrentOperations(int maxConcurrentOperations) {
|
||||
this.maxConcurrentOperations = maxConcurrentOperations;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.elasticsearch.action.support.ThreadedActionListener;
|
|||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.CheckedSupplier;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -53,7 +52,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.CancellableThreads;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.util.concurrent.ListenableFuture;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
|
@ -87,6 +85,7 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.IntSupplier;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
@ -113,13 +112,15 @@ public class RecoverySourceHandler {
|
|||
private final int chunkSizeInBytes;
|
||||
private final RecoveryTargetHandler recoveryTarget;
|
||||
private final int maxConcurrentFileChunks;
|
||||
private final int maxConcurrentOperations;
|
||||
private final ThreadPool threadPool;
|
||||
private final CancellableThreads cancellableThreads = new CancellableThreads();
|
||||
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
|
||||
private final ListenableFuture<RecoveryResponse> future = new ListenableFuture<>();
|
||||
|
||||
public RecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, ThreadPool threadPool,
|
||||
StartRecoveryRequest request, int fileChunkSizeInBytes, int maxConcurrentFileChunks) {
|
||||
StartRecoveryRequest request, int fileChunkSizeInBytes, int maxConcurrentFileChunks,
|
||||
int maxConcurrentOperations) {
|
||||
this.shard = shard;
|
||||
this.recoveryTarget = recoveryTarget;
|
||||
this.threadPool = threadPool;
|
||||
|
@ -129,6 +130,7 @@ public class RecoverySourceHandler {
|
|||
this.chunkSizeInBytes = fileChunkSizeInBytes;
|
||||
// if the target is on an old version, it won't be able to handle out-of-order file chunks.
|
||||
this.maxConcurrentFileChunks = request.targetNode().getVersion().onOrAfter(Version.V_6_7_0) ? maxConcurrentFileChunks : 1;
|
||||
this.maxConcurrentOperations = maxConcurrentOperations;
|
||||
}
|
||||
|
||||
public StartRecoveryRequest getRequest() {
|
||||
|
@ -321,12 +323,6 @@ public class RecoverySourceHandler {
|
|||
final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion();
|
||||
phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
|
||||
retentionLeases, mappingVersionOnPrimary, sendSnapshotStep);
|
||||
sendSnapshotStep.whenComplete(
|
||||
r -> IOUtils.close(phase2Snapshot),
|
||||
e -> {
|
||||
IOUtils.closeWhileHandlingException(phase2Snapshot);
|
||||
onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e));
|
||||
});
|
||||
|
||||
}, onFailure);
|
||||
|
||||
|
@ -341,7 +337,7 @@ public class RecoverySourceHandler {
|
|||
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
|
||||
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
|
||||
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
|
||||
prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
|
||||
prepareEngineStep.result().millis(), sendSnapshotResult.sentOperations, sendSnapshotResult.tookTime.millis());
|
||||
try {
|
||||
future.onResponse(response);
|
||||
} finally {
|
||||
|
@ -668,106 +664,122 @@ public class RecoverySourceHandler {
|
|||
throw new IndexShardClosedException(request.shardId());
|
||||
}
|
||||
logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");
|
||||
|
||||
final AtomicInteger skippedOps = new AtomicInteger();
|
||||
final AtomicInteger totalSentOps = new AtomicInteger();
|
||||
final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch.
|
||||
final CheckedSupplier<List<Translog.Operation>, IOException> readNextBatch = () -> {
|
||||
// We need to synchronized Snapshot#next() because it's called by different threads through sendBatch.
|
||||
// Even though those calls are not concurrent, Snapshot#next() uses non-synchronized state and is not multi-thread-compatible.
|
||||
synchronized (snapshot) {
|
||||
final List<Translog.Operation> ops = lastBatchCount.get() > 0 ? new ArrayList<>(lastBatchCount.get()) : new ArrayList<>();
|
||||
long batchSizeInBytes = 0L;
|
||||
Translog.Operation operation;
|
||||
while ((operation = snapshot.next()) != null) {
|
||||
if (shard.state() == IndexShardState.CLOSED) {
|
||||
throw new IndexShardClosedException(request.shardId());
|
||||
}
|
||||
cancellableThreads.checkForCancel();
|
||||
final long seqNo = operation.seqNo();
|
||||
if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
|
||||
skippedOps.incrementAndGet();
|
||||
continue;
|
||||
}
|
||||
ops.add(operation);
|
||||
batchSizeInBytes += operation.estimateSize();
|
||||
totalSentOps.incrementAndGet();
|
||||
|
||||
// check if this request is past bytes threshold, and if so, send it off
|
||||
if (batchSizeInBytes >= chunkSizeInBytes) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
lastBatchCount.set(ops.size());
|
||||
return ops;
|
||||
}
|
||||
};
|
||||
|
||||
final StopWatch stopWatch = new StopWatch().start();
|
||||
final ActionListener<Long> batchedListener = ActionListener.map(listener,
|
||||
targetLocalCheckpoint -> {
|
||||
assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()
|
||||
final StepListener<Void> sendListener = new StepListener<>();
|
||||
final OperationBatchSender sender = new OperationBatchSender(startingSeqNo, endingSeqNo, snapshot, maxSeenAutoIdTimestamp,
|
||||
maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersion, sendListener);
|
||||
sendListener.whenComplete(
|
||||
ignored -> {
|
||||
final long skippedOps = sender.skippedOps.get();
|
||||
final int totalSentOps = sender.sentOps.get();
|
||||
final long targetLocalCheckpoint = sender.targetLocalCheckpoint.get();
|
||||
assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps + totalSentOps
|
||||
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
|
||||
snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get());
|
||||
snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps, totalSentOps);
|
||||
stopWatch.stop();
|
||||
final TimeValue tookTime = stopWatch.totalTime();
|
||||
logger.trace("recovery [phase2]: took [{}]", tookTime);
|
||||
return new SendSnapshotResult(targetLocalCheckpoint, totalSentOps.get(), tookTime);
|
||||
}
|
||||
);
|
||||
listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps, tookTime));
|
||||
}, listener::onFailure);
|
||||
sender.start();
|
||||
}
|
||||
|
||||
sendBatch(
|
||||
readNextBatch,
|
||||
true,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
private static class OperationChunkRequest implements MultiChunkTransfer.ChunkRequest {
|
||||
final List<Translog.Operation> operations;
|
||||
final boolean lastChunk;
|
||||
|
||||
OperationChunkRequest(List<Translog.Operation> operations, boolean lastChunk) {
|
||||
this.operations = operations;
|
||||
this.lastChunk = lastChunk;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean lastChunk() {
|
||||
return lastChunk;
|
||||
}
|
||||
}
|
||||
|
||||
private class OperationBatchSender extends MultiChunkTransfer<Translog.Snapshot, OperationChunkRequest> {
|
||||
private final long startingSeqNo;
|
||||
private final long endingSeqNo;
|
||||
private final Translog.Snapshot snapshot;
|
||||
private final long maxSeenAutoIdTimestamp;
|
||||
private final long maxSeqNoOfUpdatesOrDeletes;
|
||||
private final RetentionLeases retentionLeases;
|
||||
private final long mappingVersion;
|
||||
private int lastBatchCount = 0; // used to estimate the count of the subsequent batch.
|
||||
private final AtomicInteger skippedOps = new AtomicInteger();
|
||||
private final AtomicInteger sentOps = new AtomicInteger();
|
||||
private final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
|
||||
OperationBatchSender(long startingSeqNo, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
|
||||
long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases, long mappingVersion,
|
||||
ActionListener<Void> listener) {
|
||||
super(logger, threadPool.getThreadContext(), listener, maxConcurrentOperations, Collections.singletonList(snapshot));
|
||||
this.startingSeqNo = startingSeqNo;
|
||||
this.endingSeqNo = endingSeqNo;
|
||||
this.snapshot = snapshot;
|
||||
this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp;
|
||||
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
|
||||
this.retentionLeases = retentionLeases;
|
||||
this.mappingVersion = mappingVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized OperationChunkRequest nextChunkRequest(Translog.Snapshot snapshot) throws IOException {
|
||||
// We need to synchronized Snapshot#next() because it's called by different threads through sendBatch.
|
||||
// Even though those calls are not concurrent, Snapshot#next() uses non-synchronized state and is not multi-thread-compatible.
|
||||
assert Transports.assertNotTransportThread("[phase2]");
|
||||
cancellableThreads.checkForCancel();
|
||||
final List<Translog.Operation> ops = lastBatchCount > 0 ? new ArrayList<>(lastBatchCount) : new ArrayList<>();
|
||||
long batchSizeInBytes = 0L;
|
||||
Translog.Operation operation;
|
||||
while ((operation = snapshot.next()) != null) {
|
||||
if (shard.state() == IndexShardState.CLOSED) {
|
||||
throw new IndexShardClosedException(request.shardId());
|
||||
}
|
||||
final long seqNo = operation.seqNo();
|
||||
if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
|
||||
skippedOps.incrementAndGet();
|
||||
continue;
|
||||
}
|
||||
ops.add(operation);
|
||||
batchSizeInBytes += operation.estimateSize();
|
||||
sentOps.incrementAndGet();
|
||||
|
||||
// check if this request is past bytes threshold, and if so, send it off
|
||||
if (batchSizeInBytes >= chunkSizeInBytes) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
lastBatchCount = ops.size();
|
||||
return new OperationChunkRequest(ops, operation == null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void executeChunkRequest(OperationChunkRequest request, ActionListener<Void> listener) {
|
||||
cancellableThreads.checkForCancel();
|
||||
recoveryTarget.indexTranslogOperations(
|
||||
request.operations,
|
||||
snapshot.totalOperations(),
|
||||
maxSeenAutoIdTimestamp,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
retentionLeases,
|
||||
mappingVersion,
|
||||
batchedListener);
|
||||
}
|
||||
ActionListener.delegateFailure(listener, (l, newCheckpoint) -> {
|
||||
targetLocalCheckpoint.updateAndGet(curr -> SequenceNumbers.max(curr, newCheckpoint));
|
||||
l.onResponse(null);
|
||||
}));
|
||||
}
|
||||
|
||||
private void sendBatch(
|
||||
final CheckedSupplier<List<Translog.Operation>, IOException> nextBatch,
|
||||
final boolean firstBatch,
|
||||
final long targetLocalCheckpoint,
|
||||
final int totalTranslogOps,
|
||||
final long maxSeenAutoIdTimestamp,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final RetentionLeases retentionLeases,
|
||||
final long mappingVersionOnPrimary,
|
||||
final ActionListener<Long> listener) throws IOException {
|
||||
assert ThreadPool.assertCurrentMethodIsNotCalledRecursively();
|
||||
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[send translog]");
|
||||
final List<Translog.Operation> operations = nextBatch.get();
|
||||
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
|
||||
if (operations.isEmpty() == false || firstBatch) {
|
||||
cancellableThreads.checkForCancel();
|
||||
recoveryTarget.indexTranslogOperations(
|
||||
operations,
|
||||
totalTranslogOps,
|
||||
maxSeenAutoIdTimestamp,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
retentionLeases,
|
||||
mappingVersionOnPrimary,
|
||||
ActionListener.wrap(
|
||||
newCheckpoint -> {
|
||||
sendBatch(
|
||||
nextBatch,
|
||||
false,
|
||||
SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
|
||||
totalTranslogOps,
|
||||
maxSeenAutoIdTimestamp,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
retentionLeases,
|
||||
mappingVersionOnPrimary,
|
||||
listener);
|
||||
},
|
||||
listener::onFailure
|
||||
));
|
||||
} else {
|
||||
listener.onResponse(targetLocalCheckpoint);
|
||||
@Override
|
||||
protected void handleError(Translog.Snapshot snapshot, Exception e) {
|
||||
throw new RecoveryEngineException(shard.shardId(), 2, "failed to send/replay operations", e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
snapshot.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -812,12 +824,12 @@ public class RecoverySourceHandler {
|
|||
|
||||
static final class SendSnapshotResult {
|
||||
final long targetLocalCheckpoint;
|
||||
final int totalOperations;
|
||||
final int sentOperations;
|
||||
final TimeValue tookTime;
|
||||
|
||||
SendSnapshotResult(final long targetLocalCheckpoint, final int totalOperations, final TimeValue tookTime) {
|
||||
SendSnapshotResult(final long targetLocalCheckpoint, final int sentOperations, final TimeValue tookTime) {
|
||||
this.targetLocalCheckpoint = targetLocalCheckpoint;
|
||||
this.totalOperations = totalOperations;
|
||||
this.sentOperations = sentOperations;
|
||||
this.tookTime = tookTime;
|
||||
}
|
||||
}
|
||||
|
@ -839,7 +851,7 @@ public class RecoverySourceHandler {
|
|||
'}';
|
||||
}
|
||||
|
||||
private static class FileChunk implements MultiFileTransfer.ChunkRequest, Releasable {
|
||||
private static class FileChunk implements MultiChunkTransfer.ChunkRequest, Releasable {
|
||||
final StoreFileMetadata md;
|
||||
final BytesReference content;
|
||||
final long position;
|
||||
|
@ -867,16 +879,16 @@ public class RecoverySourceHandler {
|
|||
|
||||
void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener<Void> listener) {
|
||||
ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length)); // send smallest first
|
||||
final ThreadContext threadContext = threadPool.getThreadContext();
|
||||
final MultiFileTransfer<FileChunk> multiFileSender =
|
||||
new MultiFileTransfer<FileChunk>(logger, threadContext, listener, maxConcurrentFileChunks, Arrays.asList(files)) {
|
||||
|
||||
final MultiChunkTransfer<StoreFileMetadata, FileChunk>multiFileSender = new MultiChunkTransfer<StoreFileMetadata, FileChunk>(
|
||||
logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) {
|
||||
|
||||
final Deque<byte[]> buffers = new ConcurrentLinkedDeque<>();
|
||||
InputStreamIndexInput currentInput = null;
|
||||
long offset = 0;
|
||||
|
||||
@Override
|
||||
protected void onNewFile(StoreFileMetadata md) throws IOException {
|
||||
protected void onNewResource(StoreFileMetadata md) throws IOException {
|
||||
offset = 0;
|
||||
IOUtils.close(currentInput, () -> currentInput = null);
|
||||
final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Numbers;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -53,10 +54,12 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.CancellableThreads;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.RecoveryEngineException;
|
||||
import org.elasticsearch.index.engine.SegmentsStats;
|
||||
import org.elasticsearch.index.mapper.IdFieldMapper;
|
||||
import org.elasticsearch.index.mapper.ParseContext;
|
||||
|
@ -88,14 +91,18 @@ import org.junit.Before;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
|
@ -180,7 +187,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor),
|
||||
threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5));
|
||||
threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5), between(1, 5));
|
||||
PlainActionFuture<Void> sendFilesFuture = new PlainActionFuture<>();
|
||||
handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture);
|
||||
sendFilesFuture.actionGet();
|
||||
|
@ -241,13 +248,13 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
|
||||
threadPool, request, fileChunkSizeInBytes, between(1, 10));
|
||||
threadPool, request, fileChunkSizeInBytes, between(1, 10), between(1, 10));
|
||||
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
|
||||
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
|
||||
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, randomNonNegativeLong(), future);
|
||||
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
|
||||
RecoverySourceHandler.SendSnapshotResult result = future.actionGet();
|
||||
assertThat(result.totalOperations, equalTo(expectedOps));
|
||||
assertThat(result.sentOperations, equalTo(expectedOps));
|
||||
shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo));
|
||||
assertThat(shippedOps.size(), equalTo(expectedOps));
|
||||
for (int i = 0; i < shippedOps.size(); i++) {
|
||||
|
@ -281,17 +288,76 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
|
||||
threadPool, request, fileChunkSizeInBytes, between(1, 10));
|
||||
threadPool, request, fileChunkSizeInBytes, between(1, 10), between(1, 10));
|
||||
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
|
||||
final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
|
||||
final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
|
||||
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()),
|
||||
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, randomNonNegativeLong(), future);
|
||||
if (wasFailed.get()) {
|
||||
assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index"));
|
||||
final RecoveryEngineException error = expectThrows(RecoveryEngineException.class, future::actionGet);
|
||||
assertThat(error.getMessage(), equalTo("Phase[2] failed to send/replay operations"));
|
||||
assertThat(error.getCause().getMessage(), equalTo("test - failed to index"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSendOperationsConcurrently() throws Throwable {
|
||||
final IndexShard shard = mock(IndexShard.class);
|
||||
when(shard.state()).thenReturn(IndexShardState.STARTED);
|
||||
Set<Long> receivedSeqNos = ConcurrentCollections.newConcurrentSet();
|
||||
long maxSeenAutoIdTimestamp = randomBoolean() ? -1 : randomNonNegativeLong();
|
||||
long maxSeqNoOfUpdatesOrDeletes = randomBoolean() ? -1 : randomNonNegativeLong();
|
||||
RetentionLeases retentionLeases = new RetentionLeases(randomNonNegativeLong(), randomNonNegativeLong(), Collections.emptySet());
|
||||
long mappingVersion = randomNonNegativeLong();
|
||||
AtomicLong localCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
int numOps = randomIntBetween(0, 1000);
|
||||
AtomicBoolean received = new AtomicBoolean();
|
||||
RecoveryTargetHandler target = new TestRecoveryTargetHandler() {
|
||||
@Override
|
||||
public void indexTranslogOperations(List<Translog.Operation> operations, int receivedTotalOps,
|
||||
long receivedMaxSeenAutoIdTimestamp, long receivedMaxSeqNoOfUpdatesOrDeletes,
|
||||
RetentionLeases receivedRetentionLease, long receivedMappingVersion,
|
||||
ActionListener<Long> listener) {
|
||||
received.set(true);
|
||||
assertThat(receivedMaxSeenAutoIdTimestamp, equalTo(maxSeenAutoIdTimestamp));
|
||||
assertThat(receivedMaxSeqNoOfUpdatesOrDeletes, equalTo(maxSeqNoOfUpdatesOrDeletes));
|
||||
assertThat(receivedRetentionLease, equalTo(retentionLeases));
|
||||
assertThat(receivedMappingVersion, equalTo(mappingVersion));
|
||||
assertThat(receivedTotalOps, equalTo(numOps));
|
||||
for (Translog.Operation operation : operations) {
|
||||
receivedSeqNos.add(operation.seqNo());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
localCheckpoint.addAndGet(randomIntBetween(1, 100));
|
||||
}
|
||||
listener.onResponse(localCheckpoint.get());
|
||||
}
|
||||
};
|
||||
|
||||
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> sendFuture = new PlainActionFuture<>();
|
||||
long startingSeqNo = randomIntBetween(0, 1000);
|
||||
long endingSeqNo = startingSeqNo + randomIntBetween(0, 10000);
|
||||
List<Translog.Operation> operations = generateOperations(numOps);
|
||||
Randomness.shuffle(operations);
|
||||
List<Translog.Operation> skipOperations = randomSubsetOf(operations);
|
||||
Translog.Snapshot snapshot = newTranslogSnapshot(operations, skipOperations);
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(target, recoveryExecutor),
|
||||
threadPool, getStartRecoveryRequest(), between(1, 10 * 1024), between(1, 5), between(1, 5));
|
||||
handler.phase2(startingSeqNo, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases,
|
||||
mappingVersion, sendFuture);
|
||||
RecoverySourceHandler.SendSnapshotResult sendSnapshotResult = sendFuture.actionGet();
|
||||
assertTrue(received.get());
|
||||
assertThat(sendSnapshotResult.targetLocalCheckpoint, equalTo(localCheckpoint.get()));
|
||||
assertThat(sendSnapshotResult.sentOperations, equalTo(receivedSeqNos.size()));
|
||||
Set<Long> sentSeqNos = new HashSet<>();
|
||||
for (Translog.Operation op : operations) {
|
||||
if (startingSeqNo <= op.seqNo() && op.seqNo() <= endingSeqNo && skipOperations.contains(op) == false) {
|
||||
sentSeqNos.add(op.seqNo());
|
||||
}
|
||||
}
|
||||
assertThat(receivedSeqNos, equalTo(sentSeqNos));
|
||||
}
|
||||
|
||||
private Engine.Index getIndex(final String id) {
|
||||
final String type = "test";
|
||||
final ParseContext.Document document = new ParseContext.Document();
|
||||
|
@ -352,7 +418,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool,
|
||||
request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8)) {
|
||||
request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8), between(1, 8)) {
|
||||
@Override
|
||||
protected void failEngine(IOException cause) {
|
||||
assertFalse(failedEngine.get());
|
||||
|
@ -409,7 +475,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool,
|
||||
request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10)) {
|
||||
request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10), between(1, 4)) {
|
||||
@Override
|
||||
protected void failEngine(IOException cause) {
|
||||
assertFalse(failedEngine.get());
|
||||
|
@ -464,7 +530,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
threadPool,
|
||||
request,
|
||||
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
|
||||
between(1, 8)) {
|
||||
between(1, 8), between(1, 8)) {
|
||||
|
||||
@Override
|
||||
void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
|
||||
|
@ -541,7 +607,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
final int maxConcurrentChunks = between(1, 8);
|
||||
final int chunkSize = between(1, 32);
|
||||
final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, threadPool, getStartRecoveryRequest(),
|
||||
chunkSize, maxConcurrentChunks);
|
||||
chunkSize, maxConcurrentChunks, between(1, 10));
|
||||
Store store = newStore(createTempDir(), false);
|
||||
List<StoreFileMetadata> files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20));
|
||||
int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum();
|
||||
|
@ -599,7 +665,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
final int maxConcurrentChunks = between(1, 4);
|
||||
final int chunkSize = between(1, 16);
|
||||
final RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(recoveryTarget, recoveryExecutor),
|
||||
threadPool, getStartRecoveryRequest(), chunkSize, maxConcurrentChunks);
|
||||
threadPool, getStartRecoveryRequest(), chunkSize, maxConcurrentChunks, between(1, 5));
|
||||
Store store = newStore(createTempDir(), false);
|
||||
List<StoreFileMetadata> files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20));
|
||||
int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum();
|
||||
|
@ -680,7 +746,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
};
|
||||
final StartRecoveryRequest startRecoveryRequest = getStartRecoveryRequest();
|
||||
final RecoverySourceHandler handler = new RecoverySourceHandler(
|
||||
shard, recoveryTarget, threadPool, startRecoveryRequest, between(1, 16), between(1, 4)) {
|
||||
shard, recoveryTarget, threadPool, startRecoveryRequest, between(1, 16), between(1, 4), between(1, 4)) {
|
||||
@Override
|
||||
void createRetentionLease(long startingSeqNo, ActionListener<RetentionLease> listener) {
|
||||
final String leaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(startRecoveryRequest.targetNode().getId());
|
||||
|
@ -709,7 +775,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
IndexShard shard = mock(IndexShard.class);
|
||||
when(shard.state()).thenReturn(IndexShardState.STARTED);
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(
|
||||
shard, new TestRecoveryTargetHandler(), threadPool, getStartRecoveryRequest(), between(1, 16), between(1, 4));
|
||||
shard, new TestRecoveryTargetHandler(), threadPool, getStartRecoveryRequest(), between(1, 16), between(1, 4), between(1, 4));
|
||||
|
||||
String syncId = UUIDs.randomBase64UUID();
|
||||
int numDocs = between(0, 1000);
|
||||
|
@ -824,8 +890,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private Translog.Snapshot newTranslogSnapshot(List<Translog.Operation> operations, List<Translog.Operation> operationsToSkip) {
|
||||
Iterator<Translog.Operation> iterator = operations.iterator();
|
||||
return new Translog.Snapshot() {
|
||||
int index = 0;
|
||||
int skippedCount = 0;
|
||||
|
||||
@Override
|
||||
|
@ -840,8 +906,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
public Translog.Operation next() {
|
||||
while (index < operations.size()) {
|
||||
Translog.Operation op = operations.get(index++);
|
||||
while (iterator.hasNext()) {
|
||||
Translog.Operation op = iterator.next();
|
||||
if (operationsToSkip.contains(op)) {
|
||||
skippedCount++;
|
||||
} else {
|
||||
|
@ -857,4 +923,24 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static List<Translog.Operation> generateOperations(int numOps) {
|
||||
final List<Translog.Operation> operations = new ArrayList<>(numOps);
|
||||
final byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
|
||||
final Set<Long> seqNos = new HashSet<>();
|
||||
for (int i = 0; i < numOps; i++) {
|
||||
final long seqNo = randomValueOtherThanMany(n -> seqNos.add(n) == false, ESTestCase::randomNonNegativeLong);
|
||||
final Translog.Operation op;
|
||||
if (randomBoolean()) {
|
||||
op = new Translog.Index("_doc", "id", seqNo, randomNonNegativeLong(), randomNonNegativeLong(), source, null, -1);
|
||||
} else if (randomBoolean()) {
|
||||
op = new Translog.Delete("_doc", "id", new Term("_id", Uid.encodeId("id")),
|
||||
seqNo, randomNonNegativeLong(), randomNonNegativeLong());
|
||||
} else {
|
||||
op = new Translog.NoOp(seqNo, randomNonNegativeLong(), "test");
|
||||
}
|
||||
operations.add(op);
|
||||
}
|
||||
return operations;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.elasticsearch.common.bytes.BytesArray;
|
|||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
|
@ -73,6 +72,7 @@ import org.elasticsearch.indices.recovery.AsyncRecoveryTarget;
|
|||
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
|
||||
import org.elasticsearch.indices.recovery.RecoveryFailedException;
|
||||
import org.elasticsearch.indices.recovery.RecoveryResponse;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
import org.elasticsearch.indices.recovery.RecoverySourceHandler;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
|
@ -634,9 +634,11 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint();
|
||||
final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(
|
||||
logger, rNode, recoveryTarget, startingSeqNo);
|
||||
int fileChunkSizeInBytes = Math.toIntExact(
|
||||
randomBoolean() ? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes() : randomIntBetween(1, 10 * 1024 * 1024));
|
||||
final RecoverySourceHandler recovery = new RecoverySourceHandler(primary,
|
||||
new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool,
|
||||
request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
|
||||
request, fileChunkSizeInBytes, between(1, 8), between(1, 8));
|
||||
primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
|
||||
currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable);
|
||||
try {
|
||||
|
|
|
@ -129,6 +129,7 @@ import org.elasticsearch.indices.IndicesRequestCache;
|
|||
import org.elasticsearch.indices.store.IndicesStore;
|
||||
import org.elasticsearch.ingest.IngestMetadata;
|
||||
import org.elasticsearch.node.NodeMocksPlugin;
|
||||
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
|
||||
import org.elasticsearch.plugins.NetworkPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -2051,7 +2052,9 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
mocks.add(MockFieldFilterPlugin.class);
|
||||
}
|
||||
}
|
||||
|
||||
if (randomBoolean()) {
|
||||
mocks.add(RecoverySettingsChunkSizePlugin.class);
|
||||
}
|
||||
if (addMockTransportService()) {
|
||||
mocks.add(getTestTransportPlugin());
|
||||
}
|
||||
|
|
|
@ -106,6 +106,7 @@ import org.elasticsearch.node.MockNode;
|
|||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.NodeService;
|
||||
import org.elasticsearch.node.NodeValidationException;
|
||||
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.ScriptModule;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
|
@ -163,6 +164,7 @@ import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_M
|
|||
import static org.elasticsearch.test.ESTestCase.assertBusy;
|
||||
import static org.elasticsearch.test.ESTestCase.getTestTransportType;
|
||||
import static org.elasticsearch.test.ESTestCase.inFipsJvm;
|
||||
import static org.elasticsearch.test.ESTestCase.randomBoolean;
|
||||
import static org.elasticsearch.test.ESTestCase.randomFrom;
|
||||
import static org.elasticsearch.test.NodeRoles.dataOnlyNode;
|
||||
import static org.elasticsearch.test.NodeRoles.masterOnlyNode;
|
||||
|
@ -405,6 +407,12 @@ public final class InternalTestCluster extends TestCluster {
|
|||
RandomNumbers.randomIntBetween(random, 20, 50)));
|
||||
builder.put(RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.getKey(),
|
||||
RandomNumbers.randomIntBetween(random, 1, 5));
|
||||
builder.put(RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING.getKey(),
|
||||
RandomNumbers.randomIntBetween(random, 1, 4));
|
||||
if (mockPlugins.contains(RecoverySettingsChunkSizePlugin.class) && randomBoolean()) {
|
||||
builder.put(RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING.getKey(),
|
||||
new ByteSizeValue(RandomNumbers.randomIntBetween(random, 256, 10 * 1024 * 1024)));
|
||||
}
|
||||
defaultSettings = builder.build();
|
||||
executor = EsExecutors.newScaling("internal_test_cluster_executor", 0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS,
|
||||
EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY));
|
||||
|
|
|
@ -57,7 +57,7 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.F
|
|||
import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.store.StoreFileMetadata;
|
||||
import org.elasticsearch.indices.recovery.MultiFileTransfer;
|
||||
import org.elasticsearch.indices.recovery.MultiChunkTransfer;
|
||||
import org.elasticsearch.indices.recovery.MultiFileWriter;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
|
@ -517,7 +517,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
protected void restoreFiles(List<FileInfo> filesToRecover, Store store, ActionListener<Void> allFilesListener) {
|
||||
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
|
||||
final List<StoreFileMetadata> mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList());
|
||||
final MultiFileTransfer<FileChunk> multiFileTransfer = new MultiFileTransfer<FileChunk>(
|
||||
final MultiChunkTransfer<StoreFileMetadata, FileChunk> multiFileTransfer = new MultiChunkTransfer<StoreFileMetadata, FileChunk>(
|
||||
logger, threadPool.getThreadContext(), allFilesListener, ccrSettings.getMaxConcurrentFileChunks(), mds) {
|
||||
|
||||
final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {
|
||||
|
@ -525,7 +525,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
long offset = 0;
|
||||
|
||||
@Override
|
||||
protected void onNewFile(StoreFileMetadata md) {
|
||||
protected void onNewResource(StoreFileMetadata md) {
|
||||
offset = 0;
|
||||
}
|
||||
|
||||
|
@ -596,7 +596,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||
}
|
||||
|
||||
private static class FileChunk implements MultiFileTransfer.ChunkRequest {
|
||||
private static class FileChunk implements MultiChunkTransfer.ChunkRequest {
|
||||
final StoreFileMetadata md;
|
||||
final int bytesRequested;
|
||||
final boolean lastChunk;
|
||||
|
|
Loading…
Reference in New Issue