A follow-up of #55353 to avoid copying file chunks before sending them to the network layer. Relates #55353
This commit is contained in:
parent
39ba06cbb2
commit
60d097e262
|
@ -30,7 +30,6 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
|
@ -65,17 +64,14 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
|
|||
private final TransportService transportService;
|
||||
private final IndicesService indicesService;
|
||||
private final RecoverySettings recoverySettings;
|
||||
private final BigArrays bigArrays;
|
||||
|
||||
final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries();
|
||||
|
||||
@Inject
|
||||
public PeerRecoverySourceService(TransportService transportService, IndicesService indicesService,
|
||||
RecoverySettings recoverySettings, BigArrays bigArrays) {
|
||||
public PeerRecoverySourceService(TransportService transportService, IndicesService indicesService, RecoverySettings recoverySettings) {
|
||||
this.transportService = transportService;
|
||||
this.indicesService = indicesService;
|
||||
this.recoverySettings = recoverySettings;
|
||||
this.bigArrays = bigArrays;
|
||||
transportService.registerRequestHandler(Actions.START_RECOVERY, ThreadPool.Names.GENERIC, StartRecoveryRequest::new,
|
||||
new StartRecoveryTransportRequestHandler());
|
||||
}
|
||||
|
@ -225,7 +221,7 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
|
|||
private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) {
|
||||
RecoverySourceHandler handler;
|
||||
final RemoteRecoveryTargetHandler recoveryTarget =
|
||||
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, bigArrays,
|
||||
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
|
||||
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
|
||||
handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request,
|
||||
Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks());
|
||||
|
|
|
@ -78,9 +78,11 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Deque;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
@ -830,23 +832,30 @@ public class RecoverySourceHandler {
|
|||
'}';
|
||||
}
|
||||
|
||||
private static class FileChunk implements MultiFileTransfer.ChunkRequest {
|
||||
private static class FileChunk implements MultiFileTransfer.ChunkRequest, Releasable {
|
||||
final StoreFileMetadata md;
|
||||
final BytesReference content;
|
||||
final long position;
|
||||
final boolean lastChunk;
|
||||
final Releasable onClose;
|
||||
|
||||
FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk) {
|
||||
FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk, Releasable onClose) {
|
||||
this.md = md;
|
||||
this.content = content;
|
||||
this.position = position;
|
||||
this.lastChunk = lastChunk;
|
||||
this.onClose = onClose;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean lastChunk() {
|
||||
return lastChunk;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
onClose.close();
|
||||
}
|
||||
}
|
||||
|
||||
void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener<Void> listener) {
|
||||
|
@ -855,7 +864,7 @@ public class RecoverySourceHandler {
|
|||
final MultiFileTransfer<FileChunk> multiFileSender =
|
||||
new MultiFileTransfer<FileChunk>(logger, threadContext, listener, maxConcurrentFileChunks, Arrays.asList(files)) {
|
||||
|
||||
final byte[] buffer = new byte[chunkSizeInBytes];
|
||||
final Deque<byte[]> buffers = new ConcurrentLinkedDeque<>();
|
||||
InputStreamIndexInput currentInput = null;
|
||||
long offset = 0;
|
||||
|
||||
|
@ -872,16 +881,26 @@ public class RecoverySourceHandler {
|
|||
};
|
||||
}
|
||||
|
||||
private byte[] acquireBuffer() {
|
||||
final byte[] buffer = buffers.pollFirst();
|
||||
if (buffer != null) {
|
||||
return buffer;
|
||||
}
|
||||
return new byte[chunkSizeInBytes];
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException {
|
||||
assert Transports.assertNotTransportThread("read file chunk");
|
||||
cancellableThreads.checkForCancel();
|
||||
final byte[] buffer = acquireBuffer();
|
||||
final int bytesRead = currentInput.read(buffer);
|
||||
if (bytesRead == -1) {
|
||||
throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name());
|
||||
}
|
||||
final boolean lastChunk = offset + bytesRead == md.length();
|
||||
final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), offset, lastChunk);
|
||||
final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), offset, lastChunk,
|
||||
() -> buffers.addFirst(buffer));
|
||||
offset += bytesRead;
|
||||
return chunk;
|
||||
}
|
||||
|
@ -890,7 +909,8 @@ public class RecoverySourceHandler {
|
|||
protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
|
||||
cancellableThreads.checkForCancel();
|
||||
recoveryTarget.writeFileChunk(
|
||||
request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener);
|
||||
request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(),
|
||||
ActionListener.runBefore(listener, request::close));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -30,10 +30,8 @@ import org.elasticsearch.action.support.RetryableAction;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.breaker.CircuitBreakingException;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.CancellableThreads;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
|
@ -65,7 +63,6 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
|
|||
private final ThreadPool threadPool;
|
||||
private final long recoveryId;
|
||||
private final ShardId shardId;
|
||||
private final BigArrays bigArrays;
|
||||
private final DiscoveryNode targetNode;
|
||||
private final RecoverySettings recoverySettings;
|
||||
private final Map<Object, RetryableAction<?>> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap();
|
||||
|
@ -78,13 +75,12 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
|
|||
private final Consumer<Long> onSourceThrottle;
|
||||
private volatile boolean isCancelled = false;
|
||||
|
||||
public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService, BigArrays bigArrays,
|
||||
public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService,
|
||||
DiscoveryNode targetNode, RecoverySettings recoverySettings, Consumer<Long> onSourceThrottle) {
|
||||
this.transportService = transportService;
|
||||
this.threadPool = transportService.getThreadPool();
|
||||
this.recoveryId = recoveryId;
|
||||
this.shardId = shardId;
|
||||
this.bigArrays = bigArrays;
|
||||
this.targetNode = targetNode;
|
||||
this.recoverySettings = recoverySettings;
|
||||
this.onSourceThrottle = onSourceThrottle;
|
||||
|
@ -208,29 +204,14 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
|
|||
}
|
||||
|
||||
final String action = PeerRecoveryTargetService.Actions.FILE_CHUNK;
|
||||
final ReleasableBytesStreamOutput output = new ReleasableBytesStreamOutput(content.length(), bigArrays);
|
||||
boolean actionStarted = false;
|
||||
try {
|
||||
content.writeTo(output);
|
||||
/* we send estimateTotalOperations with every request since we collect stats on the target and that way we can
|
||||
* see how many translog ops we accumulate while copying files across the network. A future optimization
|
||||
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
|
||||
*/
|
||||
final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest(recoveryId, shardId, fileMetadata,
|
||||
position, output.bytes(), lastChunk, totalTranslogOps, throttleTimeInNanos);
|
||||
final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest(
|
||||
recoveryId, shardId, fileMetadata, position, content, lastChunk, totalTranslogOps, throttleTimeInNanos);
|
||||
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
|
||||
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
|
||||
final ActionListener<TransportResponse.Empty> releaseListener = ActionListener.runBefore(responseListener, output::close);
|
||||
executeRetryableAction(action, request, fileChunkRequestOptions, releaseListener, reader);
|
||||
actionStarted = true;
|
||||
} catch (IOException e) {
|
||||
// Since the content data is buffer in memory, we should never get an exception.
|
||||
throw new AssertionError(e);
|
||||
} finally {
|
||||
if (actionStarted == false) {
|
||||
output.close();
|
||||
}
|
||||
}
|
||||
executeRetryableAction(action, request, fileChunkRequestOptions, ActionListener.map(listener, r -> null), reader);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -606,7 +606,7 @@ public class Node implements Closeable {
|
|||
RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
|
||||
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
|
||||
b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService,
|
||||
indicesService, recoverySettings, bigArrays));
|
||||
indicesService, recoverySettings));
|
||||
b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(threadPool,
|
||||
transportService, recoverySettings, clusterService));
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.indices.recovery;
|
|||
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||
|
@ -40,8 +39,7 @@ public class PeerRecoverySourceServiceTests extends IndexShardTestCase {
|
|||
IndexShard primary = newStartedShard(true);
|
||||
PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService(
|
||||
mock(TransportService.class), mock(IndicesService.class),
|
||||
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
|
||||
BigArrays.NON_RECYCLING_INSTANCE);
|
||||
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
|
||||
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
|
||||
getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO);
|
||||
|
|
|
@ -1475,7 +1475,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
repositoriesService,
|
||||
mock(SearchService.class),
|
||||
new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver),
|
||||
new PeerRecoverySourceService(transportService, indicesService, recoverySettings, bigArrays),
|
||||
new PeerRecoverySourceService(transportService, indicesService, recoverySettings),
|
||||
snapshotShardsService,
|
||||
new PrimaryReplicaSyncer(
|
||||
transportService,
|
||||
|
|
Loading…
Reference in New Issue