diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index 736a6e4c110..521c8c8f1a3 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexEventListener; @@ -65,17 +64,14 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem private final TransportService transportService; private final IndicesService indicesService; private final RecoverySettings recoverySettings; - private final BigArrays bigArrays; final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries(); @Inject - public PeerRecoverySourceService(TransportService transportService, IndicesService indicesService, - RecoverySettings recoverySettings, BigArrays bigArrays) { + public PeerRecoverySourceService(TransportService transportService, IndicesService indicesService, RecoverySettings recoverySettings) { this.transportService = transportService; this.indicesService = indicesService; this.recoverySettings = recoverySettings; - this.bigArrays = bigArrays; transportService.registerRequestHandler(Actions.START_RECOVERY, ThreadPool.Names.GENERIC, StartRecoveryRequest::new, new StartRecoveryTransportRequestHandler()); } @@ -225,7 +221,7 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) { RecoverySourceHandler handler; final RemoteRecoveryTargetHandler recoveryTarget = - new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, bigArrays, + new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)); handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks()); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index d5141e65ce6..5b83b0664c6 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -78,9 +78,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.List; import java.util.Locale; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -830,23 +832,30 @@ public class RecoverySourceHandler { '}'; } - private static class FileChunk implements MultiFileTransfer.ChunkRequest { + private static class FileChunk implements MultiFileTransfer.ChunkRequest, Releasable { final StoreFileMetadata md; final BytesReference content; final long position; final boolean lastChunk; + final Releasable onClose; - FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk) { + FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk, Releasable onClose) { this.md = md; this.content = content; this.position = position; this.lastChunk = lastChunk; + this.onClose = onClose; } @Override public boolean lastChunk() { return lastChunk; } + + @Override + public void close() { + onClose.close(); + } } void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener listener) { @@ -855,7 +864,7 @@ public class RecoverySourceHandler { final MultiFileTransfer multiFileSender = new MultiFileTransfer(logger, threadContext, listener, maxConcurrentFileChunks, Arrays.asList(files)) { - final byte[] buffer = new byte[chunkSizeInBytes]; + final Deque buffers = new ConcurrentLinkedDeque<>(); InputStreamIndexInput currentInput = null; long offset = 0; @@ -872,16 +881,26 @@ public class RecoverySourceHandler { }; } + private byte[] acquireBuffer() { + final byte[] buffer = buffers.pollFirst(); + if (buffer != null) { + return buffer; + } + return new byte[chunkSizeInBytes]; + } + @Override protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException { assert Transports.assertNotTransportThread("read file chunk"); cancellableThreads.checkForCancel(); + final byte[] buffer = acquireBuffer(); final int bytesRead = currentInput.read(buffer); if (bytesRead == -1) { throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name()); } final boolean lastChunk = offset + bytesRead == md.length(); - final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), offset, lastChunk); + final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), offset, lastChunk, + () -> buffers.addFirst(buffer)); offset += bytesRead; return chunk; } @@ -890,7 +909,8 @@ public class RecoverySourceHandler { protected void executeChunkRequest(FileChunk request, ActionListener listener) { cancellableThreads.checkForCancel(); recoveryTarget.writeFileChunk( - request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener); + request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), + ActionListener.runBefore(listener, request::close)); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 11ecb782079..b8e63961c32 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -30,10 +30,8 @@ import org.elasticsearch.action.support.RetryableAction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -65,7 +63,6 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { private final ThreadPool threadPool; private final long recoveryId; private final ShardId shardId; - private final BigArrays bigArrays; private final DiscoveryNode targetNode; private final RecoverySettings recoverySettings; private final Map> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap(); @@ -78,13 +75,12 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { private final Consumer onSourceThrottle; private volatile boolean isCancelled = false; - public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService, BigArrays bigArrays, + public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService, DiscoveryNode targetNode, RecoverySettings recoverySettings, Consumer onSourceThrottle) { this.transportService = transportService; this.threadPool = transportService.getThreadPool(); this.recoveryId = recoveryId; this.shardId = shardId; - this.bigArrays = bigArrays; this.targetNode = targetNode; this.recoverySettings = recoverySettings; this.onSourceThrottle = onSourceThrottle; @@ -208,29 +204,14 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { } final String action = PeerRecoveryTargetService.Actions.FILE_CHUNK; - final ReleasableBytesStreamOutput output = new ReleasableBytesStreamOutput(content.length(), bigArrays); - boolean actionStarted = false; - try { - content.writeTo(output); - /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can - * see how many translog ops we accumulate while copying files across the network. A future optimization - * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. - */ - final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest(recoveryId, shardId, fileMetadata, - position, output.bytes(), lastChunk, totalTranslogOps, throttleTimeInNanos); - final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; - final ActionListener responseListener = ActionListener.map(listener, r -> null); - final ActionListener releaseListener = ActionListener.runBefore(responseListener, output::close); - executeRetryableAction(action, request, fileChunkRequestOptions, releaseListener, reader); - actionStarted = true; - } catch (IOException e) { - // Since the content data is buffer in memory, we should never get an exception. - throw new AssertionError(e); - } finally { - if (actionStarted == false) { - output.close(); - } - } + /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can + * see how many translog ops we accumulate while copying files across the network. A future optimization + * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. + */ + final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest( + recoveryId, shardId, fileMetadata, position, content, lastChunk, totalTranslogOps, throttleTimeInNanos); + final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + executeRetryableAction(action, request, fileChunkRequestOptions, ActionListener.map(listener, r -> null), reader); } @Override diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index d0a03893c23..c3b69015eae 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -606,7 +606,7 @@ public class Node implements Closeable { RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings); b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService, - indicesService, recoverySettings, bigArrays)); + indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java index de46f0d9a5d..491c3974e5b 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -40,8 +39,7 @@ public class PeerRecoverySourceServiceTests extends IndexShardTestCase { IndexShard primary = newStartedShard(true); PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService( mock(TransportService.class), mock(IndicesService.class), - new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - BigArrays.NON_RECYCLING_INSTANCE); + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10), getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(), SequenceNumbers.UNASSIGNED_SEQ_NO); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index b0804aa7cee..829e3baee78 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1475,7 +1475,7 @@ public class SnapshotResiliencyTests extends ESTestCase { repositoriesService, mock(SearchService.class), new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver), - new PeerRecoverySourceService(transportService, indicesService, recoverySettings, bigArrays), + new PeerRecoverySourceService(transportService, indicesService, recoverySettings), snapshotShardsService, new PrimaryReplicaSyncer( transportService,