From 15aa3764a47bf6c9fd99dc5e0713acb12105d55c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 14 Jan 2019 15:14:46 -0500 Subject: [PATCH] Reduce recovery time with compress or secure transport (#36981) Today file-chunks are sent sequentially one by one in peer-recovery. This is a correct choice since the implementation is straightforward and recovery is network bound in most of the time. However, if the connection is encrypted, we might not be able to saturate the network pipe because encrypting/decrypting are cpu bound rather than network-bound. With this commit, a source node can send multiple (default to 2) file-chunks without waiting for the acknowledgments from the target. Below are the benchmark results for PMC and NYC_taxis. - PMC (20.2 GB) | Transport | Baseline | chunks=1 | chunks=2 | chunks=3 | chunks=4 | | ----------| ---------| -------- | -------- | -------- | -------- | | Plain | 184s | 137s | 106s | 105s | 106s | | TLS | 346s | 294s | 176s | 153s | 117s | | Compress | 1556s | 1407s | 1193s | 1183s | 1211s | - NYC_Taxis (38.6GB) | Transport | Baseline | chunks=1 | chunks=2 | chunks=3 | chunks=4 | | ----------| ---------| ---------| ---------| ---------| -------- | | Plain | 321s | 249s | 191s | * | * | | TLS | 618s | 539s | 323s | 290s | 213s | | Compress | 2622s | 2421s | 2018s | 2029s | n/a | Relates #33844 --- .../modules/indices/recovery.asciidoc | 9 + .../common/settings/ClusterSettings.java | 1 + .../recovery/PeerRecoverySourceService.java | 3 +- .../recovery/PeerRecoveryTargetService.java | 15 +- .../indices/recovery/RecoverySettings.java | 17 + .../recovery/RecoverySourceHandler.java | 145 ++++---- .../indices/recovery/RecoveryTarget.java | 70 +++- .../recovery/RecoveryTargetHandler.java | 4 +- .../recovery/RemoteRecoveryTargetHandler.java | 10 +- .../PeerRecoveryTargetServiceTests.java | 86 +++++ .../recovery/RecoverySourceHandlerTests.java | 336 +++++++++++++++--- .../elasticsearch/recovery/RelocationIT.java | 2 +- .../index/shard/IndexShardTestCase.java | 5 +- .../test/InternalTestCluster.java | 2 + 14 files changed, 560 insertions(+), 145 deletions(-) diff --git a/docs/reference/modules/indices/recovery.asciidoc b/docs/reference/modules/indices/recovery.asciidoc index d9e034941f8..d9e85c27105 100644 --- a/docs/reference/modules/indices/recovery.asciidoc +++ b/docs/reference/modules/indices/recovery.asciidoc @@ -20,5 +20,14 @@ peer recoveries: consume an excess of bandwidth (or other resources) which could destabilize the cluster. Defaults to `40mb`. +`indices.recovery.max_concurrent_file_chunks`:: + Controls the number of file chunk requests that can be sent in parallel per recovery. + As multiple recoveries are already running in parallel (controlled by + cluster.routing.allocation.node_concurrent_recoveries), increasing this expert-level + setting might only help in situations where peer recovery of a single shard is not + reaching the total inbound and outbound peer recovery traffic as configured by + indices.recovery.max_bytes_per_sec, but is CPU-bound instead, typically when using + transport-level security or compression. Defaults to `2`. + This setting can be dynamically updated on a live cluster with the <> API. diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 9b2388aa4b7..f0331ad2e30 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -214,6 +214,7 @@ public final class ClusterSettings extends AbstractScopedSettings { RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING, + RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index 9c30ab156c0..556df71ca2c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -176,7 +176,8 @@ public class PeerRecoverySourceService implements IndexEventListener { final RemoteRecoveryTargetHandler recoveryTarget = new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)); - handler = new RecoverySourceHandler(shard, recoveryTarget, request, recoverySettings.getChunkSize().bytesAsInt()); + handler = new RecoverySourceHandler(shard, recoveryTarget, request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks()); return handler; } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 8e52a05e2ac..8edaf0ef093 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -29,6 +29,8 @@ import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -602,8 +604,7 @@ public class PeerRecoveryTargetService implements IndexEventListener { @Override public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception { - try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() - )) { + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.target(); final RecoveryState.Index indexState = recoveryTarget.state().getIndex(); if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) { @@ -621,12 +622,12 @@ public class PeerRecoveryTargetService implements IndexEventListener { recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos); } } - - recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(), - request.lastChunk(), request.totalTranslogOps() - ); + final ActionListener listener = + new HandledTransportAction.ChannelActionListener<>(channel, Actions.FILE_CHUNK, request); + recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(), request.lastChunk(), + request.totalTranslogOps(), + ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); } - channel.sendResponse(TransportResponse.Empty.INSTANCE); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 31ecd4455b1..3db04dec1d6 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -39,6 +39,12 @@ public class RecoverySettings { Setting.byteSizeSetting("indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB), Property.Dynamic, Property.NodeScope); + /** + * Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node. + */ + public static final Setting INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING = + Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 2, 1, 5, Property.Dynamic, Property.NodeScope); + /** * how long to wait before retrying after issues cause by cluster state syncing between nodes * i.e., local node is not yet known on remote node, remote shard not yet started etc. @@ -78,6 +84,7 @@ public class RecoverySettings { public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); private volatile ByteSizeValue maxBytesPerSec; + private volatile int maxConcurrentFileChunks; private volatile SimpleRateLimiter rateLimiter; private volatile TimeValue retryDelayStateSync; private volatile TimeValue retryDelayNetwork; @@ -89,6 +96,7 @@ public class RecoverySettings { public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings); + this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings); // doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes) // and we want to give the master time to remove a faulty node this.retryDelayNetwork = INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.get(settings); @@ -108,6 +116,7 @@ public class RecoverySettings { logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, this::setRetryDelayNetwork); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, this::setInternalActionTimeout); @@ -180,4 +189,12 @@ public class RecoverySettings { rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac()); } } + + public int getMaxConcurrentFileChunks() { + return maxConcurrentFileChunks; + } + + private void setMaxConcurrentFileChunks(int maxConcurrentFileChunks) { + this.maxConcurrentFileChunks = maxConcurrentFileChunks; + } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 639ab477993..bdda5e8d8d4 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; @@ -44,7 +45,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; @@ -59,10 +59,9 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteTransportException; -import java.io.BufferedOutputStream; import java.io.Closeable; import java.io.IOException; -import java.io.OutputStream; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -71,10 +70,12 @@ import java.util.Locale; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.StreamSupport; +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; + /** * RecoverySourceHandler handles the three phases of shard recovery, which is * everything relating to copying the segment files as well as sending translog @@ -96,17 +97,19 @@ public class RecoverySourceHandler { private final StartRecoveryRequest request; private final int chunkSizeInBytes; private final RecoveryTargetHandler recoveryTarget; + private final int maxConcurrentFileChunks; private final CancellableThreads cancellableThreads = new CancellableThreads(); - public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget, - final StartRecoveryRequest request, - final int fileChunkSizeInBytes) { + public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget, final StartRecoveryRequest request, + final int fileChunkSizeInBytes, final int maxConcurrentFileChunks) { this.shard = shard; this.recoveryTarget = recoveryTarget; this.request = request; this.shardId = this.request.shardId().id(); this.logger = Loggers.getLogger(getClass(), request.shardId(), "recover to " + request.targetNode().getName()); this.chunkSizeInBytes = fileChunkSizeInBytes; + // if the target is on an old version, it won't be able to handle out-of-order file chunks. + this.maxConcurrentFileChunks = request.targetNode().getVersion().onOrAfter(Version.V_7_0_0) ? maxConcurrentFileChunks : 1; } public StartRecoveryRequest getRequest() { @@ -407,10 +410,7 @@ public class RecoverySourceHandler { phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo( phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get())); - // How many bytes we've copied since we last called RateLimiter.pause - final Function outputStreamFactories = - md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogOps), chunkSizeInBytes); - sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); + sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps); // Send the CLEAN_FILES request, which takes all of the files that // were transferred and renames them from their temporary file // names to the actual file names. It also writes checksums for @@ -649,73 +649,72 @@ public class RecoverySourceHandler { '}'; } - - final class RecoveryOutputStream extends OutputStream { - private final StoreFileMetaData md; - private final Supplier translogOps; - private long position = 0; - - RecoveryOutputStream(StoreFileMetaData md, Supplier translogOps) { - this.md = md; - this.translogOps = translogOps; - } - - @Override - public void write(int b) throws IOException { - throw new UnsupportedOperationException("we can't send single bytes over the wire"); - } - - @Override - public void write(byte[] b, int offset, int length) throws IOException { - sendNextChunk(position, new BytesArray(b, offset, length), md.length() == position + length); - position += length; - assert md.length() >= position : "length: " + md.length() + " but positions was: " + position; - } - - private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException { - // Actually send the file chunk to the target node, waiting for it to complete - cancellableThreads.executeIO(() -> - recoveryTarget.writeFileChunk(md, position, content, lastChunk, translogOps.get()) - ); - if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us - throw new IndexShardClosedException(request.shardId()); + void sendFiles(Store store, StoreFileMetaData[] files, Supplier translogOps) throws Exception { + ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first + final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); + final AtomicReference> error = new AtomicReference<>(); + final byte[] buffer = new byte[chunkSizeInBytes]; + for (final StoreFileMetaData md : files) { + if (error.get() != null) { + break; } + try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); + InputStream in = new InputStreamIndexInput(indexInput, md.length())) { + long position = 0; + int bytesRead; + while ((bytesRead = in.read(buffer, 0, buffer.length)) != -1) { + final BytesArray content = new BytesArray(buffer, 0, bytesRead); + final boolean lastChunk = position + content.length() == md.length(); + final long requestSeqId = requestSeqIdTracker.generateSeqNo(); + cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqId - maxConcurrentFileChunks)); + cancellableThreads.checkForCancel(); + if (error.get() != null) { + break; + } + final long requestFilePosition = position; + cancellableThreads.executeIO(() -> + recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.get(), + ActionListener.wrap( + r -> requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId), + e -> { + error.compareAndSet(null, Tuple.tuple(md, e)); + requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); + } + ))); + position += content.length(); + } + } catch (Exception e) { + error.compareAndSet(null, Tuple.tuple(md, e)); + break; + } + } + // When we terminate exceptionally, we don't wait for the outstanding requests as we don't use their results anyway. + // This allows us to end quickly and eliminate the complexity of handling requestSeqIds in case of error. + if (error.get() == null) { + cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo())); + } + if (error.get() != null) { + handleErrorOnSendFiles(store, error.get().v1(), error.get().v2()); } } - void sendFiles(Store store, StoreFileMetaData[] files, Function outputStreamFactory) throws Exception { - store.incRef(); - try { - ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first - for (int i = 0; i < files.length; i++) { - final StoreFileMetaData md = files[i]; - try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) { - // it's fine that we are only having the indexInput in the try/with block. The copy methods handles - // exceptions during close correctly and doesn't hide the original exception. - Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStreamFactory.apply(md)); - } catch (Exception e) { - final IOException corruptIndexException; - if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { - if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! - logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); - failEngine(corruptIndexException); - throw corruptIndexException; - } else { // corruption has happened on the way to replica - RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but " + - "checksums are ok", null); - exception.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage( - "{} Remote file corruption on node {}, recovering {}. local checksum OK", - shardId, request.targetNode(), md), corruptIndexException); - throw exception; - } - } else { - throw e; - } - } + private void handleErrorOnSendFiles(Store store, StoreFileMetaData md, Exception e) throws Exception { + final IOException corruptIndexException; + if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { + if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! + logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); + failEngine(corruptIndexException); + throw corruptIndexException; + } else { // corruption has happened on the way to replica + RemoteTransportException exception = new RemoteTransportException( + "File corruption occurred on recovery but checksums are ok", null); + exception.addSuppressed(e); + logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK", + shardId, request.targetNode(), md), corruptIndexException); + throw exception; } - } finally { - store.decRef(); + } else { + throw e; } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 3a3a78941b1..54a42bcdc92 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -31,6 +31,7 @@ import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; @@ -55,10 +56,12 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.PriorityQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -89,6 +92,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget private final AtomicBoolean finished = new AtomicBoolean(); private final ConcurrentMap openIndexOutputs = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap fileChunkWriters = ConcurrentCollections.newConcurrentMap(); private final CancellableThreads cancellableThreads; // last time this status was accessed @@ -340,6 +344,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } } finally { // free store. increment happens in constructor + fileChunkWriters.clear(); store.decRef(); indexShard.recoveryStats().decCurrentAsTarget(); closedLatch.countDown(); @@ -487,12 +492,10 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } } - @Override - public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, - boolean lastChunk, int totalTranslogOps) throws IOException { + private void innerWriteFileChunk(StoreFileMetaData fileMetaData, long position, + BytesReference content, boolean lastChunk) throws IOException { final Store store = store(); final String name = fileMetaData.name(); - state().getTranslog().totalOperations(totalTranslogOps); final RecoveryState.Index indexState = state().getIndex(); IndexOutput indexOutput; if (position == 0) { @@ -500,6 +503,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } else { indexOutput = getOpenIndexOutput(name); } + assert indexOutput.getFilePointer() == position : "file-pointer " + indexOutput.getFilePointer() + " != " + position; BytesRefIterator iterator = content.iterator(); BytesRef scratch; while((scratch = iterator.next()) != null) { // we iterate over all pages - this is a 0-copy for all core impls @@ -522,6 +526,64 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } } + @Override + public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, + boolean lastChunk, int totalTranslogOps, ActionListener listener) { + try { + state().getTranslog().totalOperations(totalTranslogOps); + final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter()); + writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk)); + listener.onResponse(null); + } catch (Exception e) { + listener.onFailure(e); + } + } + + private static final class FileChunk { + final StoreFileMetaData md; + final BytesReference content; + final long position; + final boolean lastChunk; + FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) { + this.md = md; + this.content = content; + this.position = position; + this.lastChunk = lastChunk; + } + } + + private final class FileChunkWriter { + // chunks can be delivered out of order, we need to buffer chunks if there's a gap between them. + final PriorityQueue pendingChunks = new PriorityQueue<>(Comparator.comparing(fc -> fc.position)); + long lastPosition = 0; + + void writeChunk(FileChunk newChunk) throws IOException { + synchronized (this) { + pendingChunks.add(newChunk); + } + while (true) { + final FileChunk chunk; + synchronized (this) { + chunk = pendingChunks.peek(); + if (chunk == null || chunk.position != lastPosition) { + return; + } + pendingChunks.remove(); + } + innerWriteFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk); + synchronized (this) { + assert lastPosition == chunk.position : "last_position " + lastPosition + " != chunk_position " + chunk.position; + lastPosition += chunk.content.length(); + if (chunk.lastChunk) { + assert pendingChunks.isEmpty() == true : "still have pending chunks [" + pendingChunks + "]"; + fileChunkWriters.remove(chunk.md.name()); + assert fileChunkWriters.containsValue(this) == false : "chunk writer [" + newChunk.md + "] was not removed"; + } + } + } + } + } + Path translogLocation() { return indexShard().shardPath().resolveTranslog(); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index b7c3de97b4e..c958665b044 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.indices.recovery; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.store.Store; @@ -27,7 +28,6 @@ import org.elasticsearch.index.translog.Translog; import java.io.IOException; import java.util.List; - public interface RecoveryTargetHandler { /** @@ -90,6 +90,6 @@ public interface RecoveryTargetHandler { /** writes a partial file chunk to the target store */ void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, - boolean lastChunk, int totalTranslogOps) throws IOException; + boolean lastChunk, int totalTranslogOps, ActionListener listener); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 3ad7f65db38..560d679bbe7 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -21,6 +21,8 @@ package org.elasticsearch.indices.recovery; import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.index.seqno.ReplicationTracker; @@ -31,6 +33,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.TransportFuture; import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -142,8 +145,8 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { } @Override - public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean - lastChunk, int totalTranslogOps) throws IOException { + public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, + boolean lastChunk, int totalTranslogOps, ActionListener listener) { // Pause using the rate limiter, if desired, to throttle the recovery final long throttleTimeInNanos; // always fetch the ratelimiter - it might be updated in real-time on the recovery settings @@ -173,7 +176,8 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { * see how many translog ops we accumulate while copying files across the network. A future optimization * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. */ - throttleTimeInNanos), fileChunkRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + throttleTimeInNanos), fileChunkRequestOptions, new ActionListenerResponseHandler<>( + ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure), in -> TransportResponse.Empty.INSTANCE)); } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index b6f5a7b6451..a2ec88cf7b5 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -24,17 +24,33 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CyclicBarrier; +import java.util.stream.Collectors; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { @@ -110,4 +126,74 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { closeShards(replica); } } + + public void testWriteFileChunksConcurrently() throws Exception { + IndexShard sourceShard = newStartedShard(true); + int numDocs = between(20, 100); + for (int i = 0; i < numDocs; i++) { + indexDoc(sourceShard, "_doc", Integer.toString(i)); + } + sourceShard.flush(new FlushRequest()); + Store.MetadataSnapshot sourceSnapshot = sourceShard.store().getMetadata(null); + List mdFiles = new ArrayList<>(); + for (StoreFileMetaData md : sourceSnapshot) { + mdFiles.add(md); + } + final IndexShard targetShard = newShard(false); + final DiscoveryNode pNode = getFakeDiscoNode(sourceShard.routingEntry().currentNodeId()); + final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId()); + targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode)); + final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null, null); + recoveryTarget.receiveFileInfo( + mdFiles.stream().map(StoreFileMetaData::name).collect(Collectors.toList()), + mdFiles.stream().map(StoreFileMetaData::length).collect(Collectors.toList()), + Collections.emptyList(), Collections.emptyList(), 0 + ); + List requests = new ArrayList<>(); + for (StoreFileMetaData md : mdFiles) { + try (IndexInput in = sourceShard.store().directory().openInput(md.name(), IOContext.READONCE)) { + int pos = 0; + while (pos < md.length()) { + int length = between(1, Math.toIntExact(md.length() - pos)); + byte[] buffer = new byte[length]; + in.readBytes(buffer, 0, length); + requests.add(new RecoveryFileChunkRequest(0, sourceShard.shardId(), md, pos, new BytesArray(buffer), + pos + length == md.length(), 1, 1)); + pos += length; + } + } + } + Randomness.shuffle(requests); + BlockingQueue queue = new ArrayBlockingQueue<>(requests.size()); + queue.addAll(requests); + Thread[] senders = new Thread[between(1, 4)]; + CyclicBarrier barrier = new CyclicBarrier(senders.length); + for (int i = 0; i < senders.length; i++) { + senders[i] = new Thread(() -> { + try { + barrier.await(); + RecoveryFileChunkRequest r; + while ((r = queue.poll()) != null) { + recoveryTarget.writeFileChunk(r.metadata(), r.position(), r.content(), r.lastChunk(), r.totalTranslogOps(), + ActionListener.wrap(ignored -> {}, + e -> { + throw new AssertionError(e); + })); + } + } catch (Exception e) { + throw new AssertionError(e); + } + }); + senders[i].start(); + } + for (Thread sender : senders) { + sender.join(); + } + recoveryTarget.renameAllTempFiles(); + recoveryTarget.decRef(); + Store.MetadataSnapshot targetSnapshot = targetShard.snapshotStoreMetadata(); + Store.RecoveryDiff diff = sourceSnapshot.recoveryDiff(targetSnapshot); + assertThat(diff.different, empty()); + closeShards(sourceShard, targetShard); + } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 17e88d2864d..7b9e8fe05da 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -32,12 +32,15 @@ import org.apache.lucene.index.Term; import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Numbers; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -59,6 +62,7 @@ import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -75,19 +79,29 @@ import org.elasticsearch.test.IndexSettingsModule; import org.mockito.ArgumentCaptor; import java.io.IOException; +import java.io.OutputStream; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.IntSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.zip.CRC32; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.core.IsNull.notNullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyObject; @@ -109,8 +123,6 @@ public class RecoverySourceHandlerTests extends ESTestCase { final RecoverySettings recoverySettings = new RecoverySettings(settings, service); final StartRecoveryRequest request = getStartRecoveryRequest(); Store store = newStore(createTempDir()); - RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, - recoverySettings.getChunkSize().bytesAsInt()); Directory dir = store.directory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig()); int numDocs = randomIntBetween(10, 100); @@ -129,19 +141,38 @@ public class RecoverySourceHandlerTests extends ESTestCase { metas.add(md); } Store targetStore = newStore(createTempDir()); - handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> { - try { - return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) { - @Override - public void close() throws IOException { - super.close(); - targetStore.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it + RecoveryTargetHandler target = new TestRecoveryTargetHandler() { + IndexOutputOutputStream out; + @Override + public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, + int totalTranslogOps, ActionListener listener) { + try { + if (position == 0) { + out = new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) { + @Override + public void close() throws IOException { + super.close(); + targetStore.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it + } + }; } - }; - } catch (IOException e) { - throw new RuntimeException(e); + final BytesRefIterator iterator = content.iterator(); + BytesRef scratch; + while ((scratch = iterator.next()) != null) { + out.write(scratch.bytes, scratch.offset, scratch.length); + } + if (lastChunk) { + out.close(); + } + listener.onResponse(null); + } catch (Exception e) { + listener.onFailure(e); + } } - }); + }; + RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5)); + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0); Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(null); Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata); assertEquals(metas.size(), recoveryDiff.identical.size()); @@ -176,7 +207,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { when(shard.state()).thenReturn(IndexShardState.STARTED); final RecoveryTargetHandler recoveryTarget = mock(RecoveryTargetHandler.class); final RecoverySourceHandler handler = - new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes); + new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); final List operations = new ArrayList<>(); final int initialNumberOfDocs = randomIntBetween(16, 64); for (int i = 0; i < initialNumberOfDocs; i++) { @@ -283,14 +314,6 @@ public class RecoverySourceHandlerTests extends ESTestCase { Path tempDir = createTempDir(); Store store = newStore(tempDir, false); AtomicBoolean failedEngine = new AtomicBoolean(false); - RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, - recoverySettings.getChunkSize().bytesAsInt()) { - @Override - protected void failEngine(IOException cause) { - assertFalse(failedEngine.get()); - failedEngine.set(true); - } - }; Directory dir = store.directory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig()); int numDocs = randomIntBetween(10, 100); @@ -313,20 +336,46 @@ public class RecoverySourceHandlerTests extends ESTestCase { (p.getFileName().toString().equals("write.lock") || p.getFileName().toString().startsWith("extra")) == false)); Store targetStore = newStore(createTempDir(), false); - try { - handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> { + RecoveryTargetHandler target = new TestRecoveryTargetHandler() { + IndexOutputOutputStream out; + @Override + public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, + int totalTranslogOps, ActionListener listener) { try { - return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) { - @Override - public void close() throws IOException { - super.close(); - store.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it - } - }; - } catch (IOException e) { - throw new RuntimeException(e); + if (position == 0) { + out = new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) { + @Override + public void close() throws IOException { + super.close(); + targetStore.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it + } + }; + } + final BytesRefIterator iterator = content.iterator(); + BytesRef scratch; + while ((scratch = iterator.next()) != null) { + out.write(scratch.bytes, scratch.offset, scratch.length); + } + if (lastChunk) { + out.close(); + } + listener.onResponse(null); + } catch (Exception e) { + IOUtils.closeWhileHandlingException(out, () -> listener.onFailure(e)); } - }); + } + }; + RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8)) { + @Override + protected void failEngine(IOException cause) { + assertFalse(failedEngine.get()); + failedEngine.set(true); + } + }; + + try { + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0); fail("corrupted index"); } catch (IOException ex) { assertNotNull(ExceptionsHelper.unwrapCorruption(ex)); @@ -342,14 +391,6 @@ public class RecoverySourceHandlerTests extends ESTestCase { Path tempDir = createTempDir(); Store store = newStore(tempDir, false); AtomicBoolean failedEngine = new AtomicBoolean(false); - RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, - recoverySettings.getChunkSize().bytesAsInt()) { - @Override - protected void failEngine(IOException cause) { - assertFalse(failedEngine.get()); - failedEngine.set(true); - } - }; Directory dir = store.directory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig()); int numDocs = randomIntBetween(10, 100); @@ -368,15 +409,27 @@ public class RecoverySourceHandlerTests extends ESTestCase { metas.add(md); } final boolean throwCorruptedIndexException = randomBoolean(); - Store targetStore = newStore(createTempDir(), false); - try { - handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> { + RecoveryTargetHandler target = new TestRecoveryTargetHandler() { + @Override + public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, + int totalTranslogOps, ActionListener listener) { if (throwCorruptedIndexException) { - throw new RuntimeException(new CorruptIndexException("foo", "bar")); + listener.onFailure(new RuntimeException(new CorruptIndexException("foo", "bar"))); } else { - throw new RuntimeException("boom"); + listener.onFailure(new RuntimeException("boom")); } - }); + } + }; + RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10)) { + @Override + protected void failEngine(IOException cause) { + assertFalse(failedEngine.get()); + failedEngine.set(true); + } + }; + try { + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0); fail("exception index"); } catch (RuntimeException ex) { assertNull(ExceptionsHelper.unwrapCorruption(ex)); @@ -389,7 +442,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { fail("not expected here"); } assertFalse(failedEngine.get()); - IOUtils.close(store, targetStore); + IOUtils.close(store); } public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOException { @@ -411,7 +464,8 @@ public class RecoverySourceHandlerTests extends ESTestCase { shard, mock(RecoveryTargetHandler.class), request, - recoverySettings.getChunkSize().bytesAsInt()) { + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + between(1, 8)) { @Override public SendFileResult phase1(final IndexCommit snapshot, final Supplier translogOps) { @@ -468,9 +522,128 @@ public class RecoverySourceHandlerTests extends ESTestCase { assertBusy(() -> assertTrue(freed.get())); } + public void testSendFileChunksConcurrently() throws Exception { + final IndexShard shard = mock(IndexShard.class); + when(shard.state()).thenReturn(IndexShardState.STARTED); + final List unrepliedChunks = new CopyOnWriteArrayList<>(); + final AtomicInteger sentChunks = new AtomicInteger(); + final TestRecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { + final AtomicLong chunkNumberGenerator = new AtomicLong(); + @Override + public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, + int totalTranslogOps, ActionListener listener) { + final long chunkNumber = chunkNumberGenerator.getAndIncrement(); + logger.info("--> write chunk name={} seq={}, position={}", md.name(), chunkNumber, position); + unrepliedChunks.add(new FileChunkResponse(chunkNumber, listener)); + sentChunks.incrementAndGet(); + } + }; + final int maxConcurrentChunks = between(1, 8); + final int chunkSize = between(1, 32); + final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, getStartRecoveryRequest(), + chunkSize, maxConcurrentChunks); + Store store = newStore(createTempDir(), false); + List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); + int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); + Thread sender = new Thread(() -> { + try { + handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0); + } catch (Exception ex) { + throw new AssertionError(ex); + } + }); + sender.start(); + assertBusy(() -> { + assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks))); + assertThat(unrepliedChunks, hasSize(sentChunks.get())); + }); + + List ackedChunks = new ArrayList<>(); + while (sentChunks.get() < totalChunks || unrepliedChunks.isEmpty() == false) { + List chunksToAck = randomSubsetOf(between(1, unrepliedChunks.size()), unrepliedChunks); + unrepliedChunks.removeAll(chunksToAck); + ackedChunks.addAll(chunksToAck); + ackedChunks.sort(Comparator.comparing(c -> c.chunkNumber)); + int checkpoint = -1; + for (int i = 0; i < ackedChunks.size(); i++) { + if (i != ackedChunks.get(i).chunkNumber) { + break; + } else { + checkpoint = i; + } + } + int chunksToSend = Math.min( + totalChunks - sentChunks.get(), // limited by the remaining chunks + maxConcurrentChunks - (sentChunks.get() - 1 - checkpoint)); // limited by the buffering chunks + + int expectedSentChunks = sentChunks.get() + chunksToSend; + int expectedUnrepliedChunks = unrepliedChunks.size() + chunksToSend; + chunksToAck.forEach(c -> c.listener.onResponse(null)); + assertBusy(() -> { + assertThat(sentChunks.get(), equalTo(expectedSentChunks)); + assertThat(unrepliedChunks, hasSize(expectedUnrepliedChunks)); + }); + } + sender.join(); + store.close(); + } + + public void testSendFileChunksStopOnError() throws Exception { + final IndexShard shard = mock(IndexShard.class); + when(shard.state()).thenReturn(IndexShardState.STARTED); + final List unrepliedChunks = new CopyOnWriteArrayList<>(); + final AtomicInteger sentChunks = new AtomicInteger(); + final TestRecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { + final AtomicLong chunkNumberGenerator = new AtomicLong(); + @Override + public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, + int totalTranslogOps, ActionListener listener) { + final long chunkNumber = chunkNumberGenerator.getAndIncrement(); + logger.info("--> write chunk name={} seq={}, position={}", md.name(), chunkNumber, position); + unrepliedChunks.add(new FileChunkResponse(chunkNumber, listener)); + sentChunks.incrementAndGet(); + } + }; + final int maxConcurrentChunks = between(1, 4); + final int chunkSize = between(1, 16); + final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, getStartRecoveryRequest(), + chunkSize, maxConcurrentChunks); + Store store = newStore(createTempDir(), false); + List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); + int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); + AtomicReference error = new AtomicReference<>(); + Thread sender = new Thread(() -> { + try { + handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0); + } catch (Exception ex) { + error.set(ex); + } + }); + sender.start(); + assertBusy(() -> assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks)))); + List failedChunks = randomSubsetOf(between(1, unrepliedChunks.size()), unrepliedChunks); + failedChunks.forEach(c -> c.listener.onFailure(new RuntimeException("test chunk exception"))); + unrepliedChunks.removeAll(failedChunks); + unrepliedChunks.forEach(c -> { + if (randomBoolean()) { + c.listener.onFailure(new RuntimeException("test")); + } else { + c.listener.onResponse(null); + } + }); + assertBusy(() -> { + assertThat(error.get(), notNullValue()); + assertThat(error.get().getMessage(), containsString("test chunk exception")); + }); + assertThat("no more chunks should be sent", sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks))); + sender.join(); + store.close(); + } + private Store newStore(Path path) throws IOException { return newStore(path, true); } + private Store newStore(Path path, boolean checkIndex) throws IOException { BaseDirectoryWrapper baseDirectoryWrapper = RecoverySourceHandlerTests.newFSDirectory(path); if (checkIndex == false) { @@ -479,5 +652,68 @@ public class RecoverySourceHandlerTests extends ESTestCase { return new Store(shardId, INDEX_SETTINGS, baseDirectoryWrapper, new DummyShardLock(shardId)); } + static final class FileChunkResponse { + final long chunkNumber; + final ActionListener listener; + FileChunkResponse(long chunkNumber, ActionListener listener) { + this.chunkNumber = chunkNumber; + this.listener = listener; + } + } + + private List generateFiles(Store store, int numFiles, IntSupplier fileSizeSupplier) throws IOException { + List files = new ArrayList<>(); + for (int i = 0; i < numFiles; i++) { + byte[] buffer = randomByteArrayOfLength(fileSizeSupplier.getAsInt()); + CRC32 digest = new CRC32(); + digest.update(buffer, 0, buffer.length); + StoreFileMetaData md = new StoreFileMetaData("test-" + i, buffer.length + 8, + Store.digestToString(digest.getValue()), org.apache.lucene.util.Version.LATEST); + try (OutputStream out = new IndexOutputOutputStream(store.createVerifyingOutput(md.name(), md, IOContext.DEFAULT))) { + out.write(buffer); + out.write(Numbers.longToBytes(digest.getValue())); + } + store.directory().sync(Collections.singleton(md.name())); + files.add(md); + } + return files; + } + + class TestRecoveryTargetHandler implements RecoveryTargetHandler { + @Override + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) { + } + + @Override + public void finalizeRecovery(long globalCheckpoint) { + } + + @Override + public void ensureClusterStateVersion(long clusterStateVersion) { + } + + @Override + public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) { + } + + @Override + public long indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu) { + return 0; + } + + @Override + public void receiveFileInfo(List phase1FileNames, List phase1FileSizes, List phase1ExistingFileNames, + List phase1ExistingFileSizes, int totalTranslogOps) { + } + + @Override + public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) { + } + + @Override + public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk, + int totalTranslogOps, ActionListener listener) { + } + } } diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 04fb8c08a97..45f0fce3b81 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -385,7 +385,7 @@ public class RelocationIT extends ESIntegTestCase { assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").setWaitForGreenStatus().get().isTimedOut()); flush(); - int allowedFailures = randomIntBetween(3, 10); + int allowedFailures = randomIntBetween(3, 5); // the default of the `index.allocation.max_retries` is 5. logger.info("--> blocking recoveries from primary (allowed failures: [{}])", allowedFailures); CountDownLatch corruptionCount = new CountDownLatch(allowedFailures); ClusterService clusterService = internalCluster().getInstance(ClusterService.class, p_node); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 0ffc9f05ff2..273967509f4 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -610,10 +610,7 @@ public abstract class IndexShardTestCase extends ESTestCase { final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId, pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo); final RecoverySourceHandler recovery = new RecoverySourceHandler( - primary, - recoveryTarget, - request, - (int) ByteSizeUnit.MB.toBytes(1)); + primary, recoveryTarget, request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8)); primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable, Collections.emptySet()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index e4a11ad414f..c1d61125399 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -390,6 +390,8 @@ public final class InternalTestCluster extends TestCluster { // always reduce this - it can make tests really slow builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis( RandomNumbers.randomIntBetween(random, 20, 50))); + builder.put(RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.getKey(), + RandomNumbers.randomIntBetween(random, 1, 5)); defaultSettings = builder.build(); executor = EsExecutors.newScaling("internal_test_cluster_executor", 0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY));