[Segment Replication] Add SegmentReplicationTargetService to orchestrate replication events. (#3439)

* Add SegmentReplicationTargetService to orchestrate replication events.

This change introduces  boilerplate classes for Segment Replication and a target service
to orchestrate replication events.

It also includes two refactors of peer recovery components for reuse.
1. Rename RecoveryFileChunkRequest to FileChunkRequest and extract code to handle throttling into
ReplicationTarget.
2. Extracts a component to execute retryable requests over the transport layer.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Code cleanup.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Make SegmentReplicationTargetService component final so that it can not
be extended by plugins.

Signed-off-by: Marc Handalian <handalm@amazon.com>
This commit is contained in:
Marc Handalian 2022-05-27 12:25:12 -07:00 committed by GitHub
parent 1b93cf8924
commit e9b19a095c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1019 additions and 209 deletions

View File

@ -77,7 +77,7 @@ import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFileChunkRequest;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.plugins.Plugin;
import org.opensearch.snapshots.SnapshotState;
@ -397,7 +397,7 @@ public class CorruptedFileIT extends OpenSearchIntegTestCase {
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
if (corrupt.get() && action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
FileChunkRequest req = (FileChunkRequest) request;
byte[] array = BytesRef.deepCopyOf(req.content().toBytesRef()).bytes;
int i = randomIntBetween(0, req.content().length() - 1);
array[i] = (byte) ~array[i]; // flip one byte in the content
@ -474,11 +474,11 @@ public class CorruptedFileIT extends OpenSearchIntegTestCase {
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
FileChunkRequest req = (FileChunkRequest) request;
if (truncate && req.length() > 1) {
BytesRef bytesRef = req.content().toBytesRef();
BytesArray array = new BytesArray(bytesRef.bytes, bytesRef.offset, (int) req.length() - 1);
request = new RecoveryFileChunkRequest(
request = new FileChunkRequest(
req.recoveryId(),
req.requestSeqNo(),
req.shardId(),

View File

@ -67,7 +67,7 @@ import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFileChunkRequest;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
@ -809,7 +809,7 @@ public class RelocationIT extends OpenSearchIntegTestCase {
TransportRequestOptions options
) throws IOException {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest chunkRequest = (RecoveryFileChunkRequest) request;
FileChunkRequest chunkRequest = (FileChunkRequest) request;
if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) {
// corrupting the segments_N files in order to make sure future recovery re-send files
logger.debug("corrupting [{}] to {}. file name: [{}]", action, connection.getNode(), chunkRequest.name());

View File

@ -43,7 +43,7 @@ import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFileChunkRequest;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
@ -146,7 +146,7 @@ public class TruncatedRecoveryIT extends OpenSearchIntegTestCase {
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
FileChunkRequest req = (FileChunkRequest) request;
logger.info("file chunk [{}] lastChunk: {}", req, req.lastChunk());
if ((req.name().endsWith("cfs") || req.name().endsWith("fdt")) && req.lastChunk() && truncate.get()) {
latch.countDown();

View File

@ -43,11 +43,11 @@ import org.opensearch.index.store.StoreFileMetadata;
import java.io.IOException;
/**
* Request for a recovery file chunk
* Request containing a file chunk.
*
* @opensearch.internal
*/
public final class RecoveryFileChunkRequest extends RecoveryTransportRequest {
public final class FileChunkRequest extends RecoveryTransportRequest {
private final boolean lastChunk;
private final long recoveryId;
private final ShardId shardId;
@ -58,7 +58,7 @@ public final class RecoveryFileChunkRequest extends RecoveryTransportRequest {
private final int totalTranslogOps;
public RecoveryFileChunkRequest(StreamInput in) throws IOException {
public FileChunkRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
@ -75,7 +75,7 @@ public final class RecoveryFileChunkRequest extends RecoveryTransportRequest {
sourceThrottleTimeInNanos = in.readLong();
}
public RecoveryFileChunkRequest(
public FileChunkRequest(
long recoveryId,
final long requestSeqNo,
ShardId shardId,

View File

@ -36,20 +36,17 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.opensearch.ExceptionsHelper;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
@ -60,7 +57,6 @@ import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.mapper.MapperException;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IllegalIndexShardStateException;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
@ -71,7 +67,6 @@ import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
@ -148,7 +143,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
transportService.registerRequestHandler(
Actions.FILE_CHUNK,
ThreadPool.Names.GENERIC,
RecoveryFileChunkRequest::new,
FileChunkRequest::new,
new FileChunkTransportRequestHandler()
);
transportService.registerRequestHandler(
@ -354,12 +349,13 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.PREPARE_TRANSLOG, request);
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.PREPARE_TRANSLOG, request);
if (listener == null) {
return;
}
recoveryRef.get().prepareForTranslogOperations(request.totalTranslogOps(), listener);
recoveryTarget.prepareForTranslogOperations(request.totalTranslogOps(), listener);
}
}
}
@ -369,12 +365,13 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FINALIZE, request);
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FINALIZE, request);
if (listener == null) {
return;
}
recoveryRef.get().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener);
recoveryTarget.finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener);
}
}
}
@ -399,8 +396,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
throws IOException {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = createOrFinishListener(
recoveryRef,
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(
channel,
Actions.TRANSLOG_OPS,
request,
@ -484,13 +480,13 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FILES_INFO, request);
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FILES_INFO, request);
if (listener == null) {
return;
}
recoveryRef.get()
.receiveFileInfo(
recoveryTarget.receiveFileInfo(
request.phase1FileNames,
request.phase1FileSizes,
request.phase1ExistingFileNames,
@ -506,89 +502,36 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.CLEAN_FILES, request);
if (listener == null) {
return;
}
recoveryRef.get()
.cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(), listener);
}
}
}
class FileChunkTransportRequestHandler implements TransportRequestHandler<RecoveryFileChunkRequest> {
// How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong();
@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FILE_CHUNK, request);
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.CLEAN_FILES, request);
if (listener == null) {
return;
}
final ReplicationLuceneIndex indexState = recoveryTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != ReplicationLuceneIndex.UNKNOWN) {
indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
}
RateLimiter rateLimiter = recoverySettings.rateLimiter();
if (rateLimiter != null) {
long bytes = bytesSinceLastPause.addAndGet(request.content().length());
if (bytes > rateLimiter.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytes);
long throttleTimeInNanos = rateLimiter.pause(bytes);
indexState.addTargetThrottling(throttleTimeInNanos);
recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
}
}
recoveryTarget.writeFileChunk(
request.metadata(),
request.position(),
request.content(),
request.lastChunk(),
recoveryTarget.cleanFiles(
request.totalTranslogOps(),
request.getGlobalCheckpoint(),
request.sourceMetaSnapshot(),
listener
);
}
}
}
private ActionListener<Void> createOrFinishListener(
final ReplicationRef<RecoveryTarget> recoveryRef,
final TransportChannel channel,
final String action,
final RecoveryTransportRequest request
) {
return createOrFinishListener(recoveryRef, channel, action, request, nullVal -> TransportResponse.Empty.INSTANCE);
}
class FileChunkTransportRequestHandler implements TransportRequestHandler<FileChunkRequest> {
private ActionListener<Void> createOrFinishListener(
final ReplicationRef<RecoveryTarget> recoveryRef,
final TransportChannel channel,
final String action,
final RecoveryTransportRequest request,
final CheckedFunction<Void, TransportResponse, Exception> responseFn
) {
// How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong();
@Override
public void messageReceived(final FileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<TransportResponse> channelListener = new ChannelActionListener<>(channel, action, request);
final ActionListener<Void> voidListener = ActionListener.map(channelListener, responseFn);
final long requestSeqNo = request.requestSeqNo();
final ActionListener<Void> listener;
if (requestSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
listener = recoveryTarget.markRequestReceivedAndCreateListener(requestSeqNo, voidListener);
} else {
listener = voidListener;
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
}
}
return listener;
}
class RecoveryRunner extends AbstractRunnable {

View File

@ -260,6 +260,7 @@ public class RecoveryState implements ReplicationState, ToXContentFragment, Writ
return translog;
}
@Override
public ReplicationTimer getTimer() {
return timer;
}

View File

@ -37,6 +37,7 @@ import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.opensearch.Assertions;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.cluster.node.DiscoveryNode;
@ -141,7 +142,7 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH
}
@Override
public void notifyListener(Exception e, boolean sendShardFailure) {
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
listener.onFailure(state(), new RecoveryFailedException(state(), e.getMessage(), e), sendShardFailure);
}

View File

@ -35,38 +35,24 @@ package org.opensearch.indices.recovery;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.RateLimiter;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.support.RetryableAction;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.EmptyTransportResponseHandler;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.SendRequestTransportException;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@ -80,12 +66,10 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
private static final Logger logger = LogManager.getLogger(RemoteRecoveryTargetHandler.class);
private final TransportService transportService;
private final ThreadPool threadPool;
private final long recoveryId;
private final ShardId shardId;
private final DiscoveryNode targetNode;
private final RecoverySettings recoverySettings;
private final Map<Object, RetryableAction<?>> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap();
private final TransportRequestOptions translogOpsRequestOptions;
private final TransportRequestOptions fileChunkRequestOptions;
@ -94,8 +78,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
private final AtomicLong requestSeqNoGenerator = new AtomicLong(0);
private final Consumer<Long> onSourceThrottle;
private final boolean retriesSupported;
private volatile boolean isCancelled = false;
private final RetryableTransportClient retryableTransportClient;
public RemoteRecoveryTargetHandler(
long recoveryId,
@ -106,7 +89,15 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
Consumer<Long> onSourceThrottle
) {
this.transportService = transportService;
this.threadPool = transportService.getThreadPool();
// It is safe to pass the retry timeout value here because RemoteRecoveryTargetHandler
// created per recovery. Any change to RecoverySettings will be applied on the next
// recovery.
this.retryableTransportClient = new RetryableTransportClient(
transportService,
targetNode,
recoverySettings.internalActionRetryTimeout(),
logger
);
this.recoveryId = recoveryId;
this.shardId = shardId;
this.targetNode = targetNode;
@ -120,7 +111,6 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())
.build();
this.retriesSupported = targetNode.getVersion().onOrAfter(LegacyESVersion.V_7_9_0);
}
public DiscoveryNode targetNode() {
@ -137,12 +127,9 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
shardId,
totalTranslogOps
);
final TransportRequestOptions options = TransportRequestOptions.builder()
.withTimeout(recoverySettings.internalActionTimeout())
.build();
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
executeRetryableAction(action, request, options, responseListener, reader);
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
}
@Override
@ -156,12 +143,9 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
globalCheckpoint,
trimAboveSeqNo
);
final TransportRequestOptions options = TransportRequestOptions.builder()
.withTimeout(recoverySettings.internalActionLongTimeout())
.build();
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
executeRetryableAction(action, request, options, responseListener, reader);
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
}
@Override
@ -200,7 +184,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
);
final Writeable.Reader<RecoveryTranslogOperationsResponse> reader = RecoveryTranslogOperationsResponse::new;
final ActionListener<RecoveryTranslogOperationsResponse> responseListener = ActionListener.map(listener, r -> r.localCheckpoint);
executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader);
retryableTransportClient.executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader);
}
@Override
@ -224,12 +208,9 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
phase1ExistingFileSizes,
totalTranslogOps
);
final TransportRequestOptions options = TransportRequestOptions.builder()
.withTimeout(recoverySettings.internalActionTimeout())
.build();
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
executeRetryableAction(action, request, options, responseListener, reader);
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
}
@Override
@ -249,12 +230,9 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
totalTranslogOps,
globalCheckpoint
);
final TransportRequestOptions options = TransportRequestOptions.builder()
.withTimeout(recoverySettings.internalActionTimeout())
.build();
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
executeRetryableAction(action, request, options, responseListener, reader);
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
}
@Override
@ -294,7 +272,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
* see how many translog ops we accumulate while copying files across the network. A future optimization
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
*/
final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest(
final FileChunkRequest request = new FileChunkRequest(
recoveryId,
requestSeqNo,
shardId,
@ -306,71 +284,17 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
throttleTimeInNanos
);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
executeRetryableAction(action, request, fileChunkRequestOptions, ActionListener.map(listener, r -> null), reader);
}
@Override
public void cancel() {
isCancelled = true;
if (onGoingRetryableActions.isEmpty()) {
return;
}
final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("recovery was cancelled");
// Dispatch to generic as cancellation calls can come on the cluster state applier thread
threadPool.generic().execute(() -> {
for (RetryableAction<?> action : onGoingRetryableActions.values()) {
action.cancel(exception);
}
onGoingRetryableActions.clear();
});
}
private <T extends TransportResponse> void executeRetryableAction(
String action,
RecoveryTransportRequest request,
TransportRequestOptions options,
ActionListener<T> actionListener,
Writeable.Reader<T> reader
) {
final Object key = new Object();
final ActionListener<T> removeListener = ActionListener.runBefore(actionListener, () -> onGoingRetryableActions.remove(key));
final TimeValue initialDelay = TimeValue.timeValueMillis(200);
final TimeValue timeout = recoverySettings.internalActionRetryTimeout();
final RetryableAction<T> retryableAction = new RetryableAction<T>(logger, threadPool, initialDelay, timeout, removeListener) {
@Override
public void tryAction(ActionListener<T> listener) {
transportService.sendRequest(
targetNode,
retryableTransportClient.executeRetryableAction(
action,
request,
options,
new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC)
fileChunkRequestOptions,
ActionListener.map(listener, r -> null),
reader
);
}
@Override
public boolean shouldRetry(Exception e) {
return retriesSupported && retryableException(e);
}
};
onGoingRetryableActions.put(key, retryableAction);
retryableAction.run();
if (isCancelled) {
retryableAction.cancel(new CancellableThreads.ExecutionCancelledException("recovery was cancelled"));
}
}
private static boolean retryableException(Exception e) {
if (e instanceof ConnectTransportException) {
return true;
} else if (e instanceof SendRequestTransportException) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
return cause instanceof ConnectTransportException;
} else if (e instanceof RemoteTransportException) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
return cause instanceof CircuitBreakingException || cause instanceof OpenSearchRejectedExecutionException;
}
return false;
public void cancel() {
retryableTransportClient.cancel();
}
}

View File

@ -0,0 +1,139 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.recovery;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.LegacyESVersion;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.support.RetryableAction;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.SendRequestTransportException;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportService;
import java.util.Map;
/**
* Client that implements retry functionality for transport layer requests.
*
* @opensearch.internal
*/
public final class RetryableTransportClient {
private final ThreadPool threadPool;
private final Map<Object, RetryableAction<?>> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap();
private volatile boolean isCancelled = false;
private final TransportService transportService;
private final TimeValue retryTimeout;
private final DiscoveryNode targetNode;
private final Logger logger;
public RetryableTransportClient(TransportService transportService, DiscoveryNode targetNode, TimeValue retryTimeout, Logger logger) {
this.threadPool = transportService.getThreadPool();
this.transportService = transportService;
this.retryTimeout = retryTimeout;
this.targetNode = targetNode;
this.logger = logger;
}
/**
* Execute a retryable action.
* @param action {@link String} Action Name.
* @param request {@link TransportRequest} Transport request to execute.
* @param actionListener {@link ActionListener} Listener to complete
* @param reader {@link Writeable.Reader} Reader to read the response stream.
* @param <T> {@link TransportResponse} type.
*/
public <T extends TransportResponse> void executeRetryableAction(
String action,
TransportRequest request,
ActionListener<T> actionListener,
Writeable.Reader<T> reader
) {
final TransportRequestOptions options = TransportRequestOptions.builder().withTimeout(retryTimeout).build();
executeRetryableAction(action, request, options, actionListener, reader);
}
<T extends TransportResponse> void executeRetryableAction(
String action,
TransportRequest request,
TransportRequestOptions options,
ActionListener<T> actionListener,
Writeable.Reader<T> reader
) {
final Object key = new Object();
final ActionListener<T> removeListener = ActionListener.runBefore(actionListener, () -> onGoingRetryableActions.remove(key));
final TimeValue initialDelay = TimeValue.timeValueMillis(200);
final RetryableAction<T> retryableAction = new RetryableAction<T>(logger, threadPool, initialDelay, retryTimeout, removeListener) {
@Override
public void tryAction(ActionListener<T> listener) {
transportService.sendRequest(
targetNode,
action,
request,
options,
new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC)
);
}
@Override
public boolean shouldRetry(Exception e) {
return targetNode.getVersion().onOrAfter(LegacyESVersion.V_7_9_0) && retryableException(e);
}
};
onGoingRetryableActions.put(key, retryableAction);
retryableAction.run();
if (isCancelled) {
retryableAction.cancel(new CancellableThreads.ExecutionCancelledException("retryable action was cancelled"));
}
}
public void cancel() {
isCancelled = true;
if (onGoingRetryableActions.isEmpty()) {
return;
}
final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled");
// Dispatch to generic as cancellation calls can come on the cluster state applier thread
threadPool.generic().execute(() -> {
for (RetryableAction<?> action : onGoingRetryableActions.values()) {
action.cancel(exception);
}
onGoingRetryableActions.clear();
});
}
private static boolean retryableException(Exception e) {
if (e instanceof ConnectTransportException) {
return true;
} else if (e instanceof SendRequestTransportException) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
return cause instanceof ConnectTransportException;
} else if (e instanceof RemoteTransportException) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
return cause instanceof CircuitBreakingException || cause instanceof OpenSearchRejectedExecutionException;
}
return false;
}
}

View File

@ -0,0 +1,79 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.transport.TransportResponse;
import java.io.IOException;
import java.util.Set;
/**
* Response returned from a {@link SegmentReplicationSource} that includes the file metadata, and SegmentInfos
* associated with a particular {@link ReplicationCheckpoint}. The {@link SegmentReplicationSource} may determine that
* the requested {@link ReplicationCheckpoint} is behind and return a different {@link ReplicationCheckpoint} in this response.
*
* @opensearch.internal
*/
public class CheckpointInfoResponse extends TransportResponse {
private final ReplicationCheckpoint checkpoint;
private final Store.MetadataSnapshot snapshot;
private final byte[] infosBytes;
// pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos
// but are still referenced by the latest commit point (Segments_N).
private final Set<StoreFileMetadata> pendingDeleteFiles;
public CheckpointInfoResponse(
final ReplicationCheckpoint checkpoint,
final Store.MetadataSnapshot snapshot,
final byte[] infosBytes,
final Set<StoreFileMetadata> additionalFiles
) {
this.checkpoint = checkpoint;
this.snapshot = snapshot;
this.infosBytes = infosBytes;
this.pendingDeleteFiles = additionalFiles;
}
public CheckpointInfoResponse(StreamInput in) throws IOException {
this.checkpoint = new ReplicationCheckpoint(in);
this.snapshot = new Store.MetadataSnapshot(in);
this.infosBytes = in.readByteArray();
this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
checkpoint.writeTo(out);
snapshot.writeTo(out);
out.writeByteArray(infosBytes);
out.writeCollection(pendingDeleteFiles);
}
public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
public Store.MetadataSnapshot getSnapshot() {
return snapshot;
}
public byte[] getInfosBytes() {
return infosBytes;
}
public Set<StoreFileMetadata> getPendingDeleteFiles() {
return pendingDeleteFiles;
}
}

View File

@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.transport.TransportResponse;
import java.io.IOException;
import java.util.List;
/**
* Response from a {@link SegmentReplicationSource} indicating that a replication event has completed.
*
* @opensearch.internal
*/
public class GetSegmentFilesResponse extends TransportResponse {
List<StoreFileMetadata> files;
public GetSegmentFilesResponse(List<StoreFileMetadata> files) {
this.files = files;
}
public GetSegmentFilesResponse(StreamInput out) throws IOException {
out.readList(StoreFileMetadata::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(files);
}
}

View File

@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.opensearch.action.ActionListener;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import java.util.List;
/**
* Represents the source of a replication event.
*
* @opensearch.internal
*/
public interface SegmentReplicationSource {
/**
* Get Metadata for a ReplicationCheckpoint.
*
* @param replicationId {@link long} - ID of the replication event.
* @param checkpoint {@link ReplicationCheckpoint} Checkpoint to fetch metadata for.
* @param listener {@link ActionListener} listener that completes with a {@link CheckpointInfoResponse}.
*/
void getCheckpointMetadata(long replicationId, ReplicationCheckpoint checkpoint, ActionListener<CheckpointInfoResponse> listener);
/**
* Fetch the requested segment files. Passes a listener that completes when files are stored locally.
*
* @param replicationId {@link long} - ID of the replication event.
* @param checkpoint {@link ReplicationCheckpoint} Checkpoint to fetch metadata for.
* @param filesToFetch {@link List} List of files to fetch.
* @param store {@link Store} Reference to the local store.
* @param listener {@link ActionListener} Listener that completes with the list of files copied.
*/
void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
Store store,
ActionListener<GetSegmentFilesResponse> listener
);
}

View File

@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.transport.TransportService;
/**
* Factory to build {@link SegmentReplicationSource} used by {@link SegmentReplicationTargetService}.
*
* @opensearch.internal
*/
public class SegmentReplicationSourceFactory {
private TransportService transportService;
private RecoverySettings recoverySettings;
private ClusterService clusterService;
public SegmentReplicationSourceFactory(
TransportService transportService,
RecoverySettings recoverySettings,
ClusterService clusterService
) {
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
}
public SegmentReplicationSource get(IndexShard shard) {
// TODO: Default to an implementation that uses the primary shard.
return null;
}
}

View File

@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.replication.common.ReplicationTimer;
/**
* ReplicationState implementation to track Segment Replication events.
*
* @opensearch.internal
*/
public class SegmentReplicationState implements ReplicationState {
/**
* The stage of the recovery state
*
* @opensearch.internal
*/
public enum Stage {
DONE((byte) 0),
INIT((byte) 1);
private static final Stage[] STAGES = new Stage[Stage.values().length];
static {
for (Stage stage : Stage.values()) {
assert stage.id() < STAGES.length && stage.id() >= 0;
STAGES[stage.id] = stage;
}
}
private final byte id;
Stage(byte id) {
this.id = id;
}
public byte id() {
return id;
}
public static Stage fromId(byte id) {
if (id < 0 || id >= STAGES.length) {
throw new IllegalArgumentException("No mapping for id [" + id + "]");
}
return STAGES[id];
}
}
public SegmentReplicationState() {
this.stage = Stage.INIT;
}
private Stage stage;
@Override
public ReplicationLuceneIndex getIndex() {
// TODO
return null;
}
@Override
public ReplicationTimer getTimer() {
// TODO
return null;
}
public Stage getStage() {
return stage;
}
public void setStage(Stage stage) {
this.stage = stage;
}
}

View File

@ -0,0 +1,115 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.replication.common.ReplicationTarget;
import java.io.IOException;
/**
* Represents the target of a replication event.
*
* @opensearch.internal
*/
public class SegmentReplicationTarget extends ReplicationTarget {
private final ReplicationCheckpoint checkpoint;
private final SegmentReplicationSource source;
private final SegmentReplicationState state;
public SegmentReplicationTarget(
ReplicationCheckpoint checkpoint,
IndexShard indexShard,
SegmentReplicationSource source,
SegmentReplicationTargetService.SegmentReplicationListener listener
) {
super("replication_target", indexShard, new ReplicationLuceneIndex(), listener);
this.checkpoint = checkpoint;
this.source = source;
this.state = new SegmentReplicationState();
}
@Override
protected void closeInternal() {
// TODO
}
@Override
protected String getPrefix() {
// TODO
return null;
}
@Override
protected void onDone() {
this.state.setStage(SegmentReplicationState.Stage.DONE);
}
@Override
protected void onCancel(String reason) {
// TODO
}
@Override
public ReplicationState state() {
return state;
}
@Override
public ReplicationTarget retryCopy() {
// TODO
return null;
}
@Override
public String description() {
// TODO
return null;
}
@Override
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
listener.onFailure(state(), e, sendShardFailure);
}
@Override
public boolean reset(CancellableThreads newTargetCancellableThreads) throws IOException {
// TODO
return false;
}
@Override
public void writeFileChunk(
StoreFileMetadata metadata,
long position,
BytesReference content,
boolean lastChunk,
int totalTranslogOps,
ActionListener<Void> listener
) {
// TODO
}
/**
* Start the Replication event.
* @param listener {@link ActionListener} listener.
*/
public void startReplication(ActionListener<Void> listener) {
// TODO
}
}

View File

@ -0,0 +1,170 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportService;
import java.util.concurrent.atomic.AtomicLong;
/**
* Service class that orchestrates replication events on replicas.
*
* @opensearch.internal
*/
public final class SegmentReplicationTargetService implements IndexEventListener {
private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class);
private final ThreadPool threadPool;
private final RecoverySettings recoverySettings;
private final ReplicationCollection<SegmentReplicationTarget> onGoingReplications;
private final SegmentReplicationSourceFactory sourceFactory;
/**
* The internal actions
*
* @opensearch.internal
*/
public static class Actions {
public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk";
}
public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
final TransportService transportService,
final SegmentReplicationSourceFactory sourceFactory
) {
this.threadPool = threadPool;
this.recoverySettings = recoverySettings;
this.onGoingReplications = new ReplicationCollection<>(logger, threadPool);
this.sourceFactory = sourceFactory;
transportService.registerRequestHandler(
Actions.FILE_CHUNK,
ThreadPool.Names.GENERIC,
FileChunkRequest::new,
new FileChunkTransportRequestHandler()
);
}
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
if (indexShard != null) {
onGoingReplications.cancelForShard(shardId, "shard closed");
}
}
public void startReplication(
final ReplicationCheckpoint checkpoint,
final IndexShard indexShard,
final SegmentReplicationListener listener
) {
startReplication(new SegmentReplicationTarget(checkpoint, indexShard, sourceFactory.get(indexShard), listener));
}
public void startReplication(final SegmentReplicationTarget target) {
final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout());
logger.trace(() -> new ParameterizedMessage("Starting replication {}", replicationId));
threadPool.generic().execute(new ReplicationRunner(replicationId));
}
/**
* Listener that runs on changes in Replication state
*
* @opensearch.internal
*/
public interface SegmentReplicationListener extends ReplicationListener {
@Override
default void onDone(ReplicationState state) {
onReplicationDone((SegmentReplicationState) state);
}
@Override
default void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
onReplicationFailure((SegmentReplicationState) state, e, sendShardFailure);
}
void onReplicationDone(SegmentReplicationState state);
void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure);
}
/**
* Runnable implementation to trigger a replication event.
*/
private class ReplicationRunner implements Runnable {
final long replicationId;
public ReplicationRunner(long replicationId) {
this.replicationId = replicationId;
}
@Override
public void run() {
start(replicationId);
}
}
private void start(final long replicationId) {
try (ReplicationRef<SegmentReplicationTarget> replicationRef = onGoingReplications.get(replicationId)) {
replicationRef.get().startReplication(new ActionListener<>() {
@Override
public void onResponse(Void o) {
onGoingReplications.markAsDone(replicationId);
}
@Override
public void onFailure(Exception e) {
onGoingReplications.fail(replicationId, new OpenSearchException("Segment Replication failed", e), true);
}
});
}
}
private class FileChunkTransportRequestHandler implements TransportRequestHandler<FileChunkRequest> {
// How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong();
@Override
public void messageReceived(final FileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) {
final SegmentReplicationTarget target = ref.get();
final ActionListener<Void> listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
}
}
}
}

View File

@ -133,7 +133,7 @@ public class ReplicationCollection<T extends ReplicationTarget> {
} catch (Exception e) {
// fail shard to be safe
assert oldTarget != null;
oldTarget.notifyListener(e, true);
oldTarget.notifyListener(new OpenSearchException("Unable to reset target", e), true);
return null;
}
}

View File

@ -14,5 +14,7 @@ package org.opensearch.indices.replication.common;
* @opensearch.internal
*/
public interface ReplicationState {
ReplicationLuceneIndex getIndex();
ReplicationTimer getTimer();
}

View File

@ -9,14 +9,25 @@
package org.opensearch.indices.replication.common;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.RateLimiter;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.Nullable;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.RecoveryTransportRequest;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportResponse;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -64,7 +75,7 @@ public abstract class ReplicationTarget extends AbstractRefCounted {
return cancellableThreads;
}
public abstract void notifyListener(Exception e, boolean sendShardFailure);
public abstract void notifyListener(OpenSearchException e, boolean sendShardFailure);
public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex stateIndex, ReplicationListener listener) {
super(name);
@ -98,6 +109,7 @@ public abstract class ReplicationTarget extends AbstractRefCounted {
lastAccessTime = System.nanoTime();
}
@Nullable
public ActionListener<Void> markRequestReceivedAndCreateListener(long requestSeqNo, ActionListener<Void> listener) {
return requestTracker.markReceivedAndCreateListener(requestSeqNo, listener);
}
@ -172,4 +184,86 @@ public abstract class ReplicationTarget extends AbstractRefCounted {
}
}
@Nullable
public ActionListener<Void> createOrFinishListener(
final TransportChannel channel,
final String action,
final RecoveryTransportRequest request
) {
return createOrFinishListener(channel, action, request, nullVal -> TransportResponse.Empty.INSTANCE);
}
@Nullable
public ActionListener<Void> createOrFinishListener(
final TransportChannel channel,
final String action,
final RecoveryTransportRequest request,
final CheckedFunction<Void, TransportResponse, Exception> responseFn
) {
final ActionListener<TransportResponse> channelListener = new ChannelActionListener<>(channel, action, request);
final ActionListener<Void> voidListener = ActionListener.map(channelListener, responseFn);
final long requestSeqNo = request.requestSeqNo();
final ActionListener<Void> listener;
if (requestSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
listener = markRequestReceivedAndCreateListener(requestSeqNo, voidListener);
} else {
listener = voidListener;
}
return listener;
}
/**
* Handle a FileChunkRequest for a {@link ReplicationTarget}.
*
* @param request {@link FileChunkRequest} Request containing the file chunk.
* @param bytesSinceLastPause {@link AtomicLong} Bytes since the last pause.
* @param rateLimiter {@link RateLimiter} Rate limiter.
* @param listener {@link ActionListener} listener that completes when the chunk has been written.
* @throws IOException When there is an issue pausing the rate limiter.
*/
public void handleFileChunk(
final FileChunkRequest request,
final ReplicationTarget replicationTarget,
final AtomicLong bytesSinceLastPause,
final RateLimiter rateLimiter,
final ActionListener<Void> listener
) throws IOException {
if (listener == null) {
return;
}
final ReplicationLuceneIndex indexState = replicationTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != ReplicationLuceneIndex.UNKNOWN) {
indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
}
if (rateLimiter != null) {
long bytes = bytesSinceLastPause.addAndGet(request.content().length());
if (bytes > rateLimiter.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytes);
long throttleTimeInNanos = rateLimiter.pause(bytes);
indexState.addTargetThrottling(throttleTimeInNanos);
replicationTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
}
}
writeFileChunk(
request.metadata(),
request.position(),
request.content(),
request.lastChunk(),
request.totalTranslogOps(),
listener
);
}
public abstract void writeFileChunk(
StoreFileMetadata metadata,
long position,
BytesReference content,
boolean lastChunk,
int totalTranslogOps,
ActionListener<Void> listener
);
}

View File

@ -105,7 +105,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
receiveFileInfoFuture
);
receiveFileInfoFuture.actionGet();
List<RecoveryFileChunkRequest> requests = new ArrayList<>();
List<FileChunkRequest> requests = new ArrayList<>();
long seqNo = 0;
for (StoreFileMetadata md : mdFiles) {
try (IndexInput in = sourceShard.store().directory().openInput(md.name(), IOContext.READONCE)) {
@ -115,7 +115,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
byte[] buffer = new byte[length];
in.readBytes(buffer, 0, length);
requests.add(
new RecoveryFileChunkRequest(
new FileChunkRequest(
0,
seqNo++,
sourceShard.shardId(),
@ -132,7 +132,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
}
}
Randomness.shuffle(requests);
BlockingQueue<RecoveryFileChunkRequest> queue = new ArrayBlockingQueue<>(requests.size());
BlockingQueue<FileChunkRequest> queue = new ArrayBlockingQueue<>(requests.size());
queue.addAll(requests);
Thread[] senders = new Thread[between(1, 4)];
CyclicBarrier barrier = new CyclicBarrier(senders.length);
@ -140,7 +140,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
senders[i] = new Thread(() -> {
try {
barrier.await();
RecoveryFileChunkRequest r;
FileChunkRequest r;
while ((r = queue.poll()) != null) {
recoveryTarget.writeFileChunk(
r.metadata(),

View File

@ -0,0 +1,127 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.junit.Assert;
import org.mockito.Mockito;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.transport.TransportService;
import java.io.IOException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {
private IndexShard indexShard;
private ReplicationCheckpoint checkpoint;
private SegmentReplicationSource replicationSource;
private SegmentReplicationTargetService sut;
@Override
public void setUp() throws Exception {
super.setUp();
final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings);
final TransportService transportService = mock(TransportService.class);
indexShard = newShard(false, settings);
checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L);
SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class);
replicationSource = mock(SegmentReplicationSource.class);
when(replicationSourceFactory.get(indexShard)).thenReturn(replicationSource);
sut = new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, replicationSourceFactory);
}
@Override
public void tearDown() throws Exception {
closeShards(indexShard);
super.tearDown();
}
public void testTargetReturnsSuccess_listenerCompletes() throws IOException {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
indexShard,
replicationSource,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
assertEquals(SegmentReplicationState.Stage.DONE, state.getStage());
}
@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
Assert.fail();
}
}
);
final SegmentReplicationTarget spy = Mockito.spy(target);
doAnswer(invocation -> {
final ActionListener<Void> listener = invocation.getArgument(0);
listener.onResponse(null);
return null;
}).when(spy).startReplication(any());
sut.startReplication(spy);
closeShards(indexShard);
}
public void testTargetThrowsException() throws IOException {
final OpenSearchException expectedError = new OpenSearchException("Fail");
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
indexShard,
replicationSource,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
Assert.fail();
}
@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
assertEquals(SegmentReplicationState.Stage.INIT, state.getStage());
assertEquals(expectedError, e.getCause());
assertTrue(sendShardFailure);
}
}
);
final SegmentReplicationTarget spy = Mockito.spy(target);
doAnswer(invocation -> {
final ActionListener<Void> listener = invocation.getArgument(0);
listener.onFailure(expectedError);
return null;
}).when(spy).startReplication(any());
sut.startReplication(spy);
closeShards(indexShard);
}
public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOException {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
indexShard,
replicationSource,
mock(SegmentReplicationTargetService.SegmentReplicationListener.class)
);
final SegmentReplicationTarget spy = Mockito.spy(target);
sut.startReplication(spy);
sut.beforeIndexShardClosed(indexShard.shardId(), indexShard, Settings.EMPTY);
Mockito.verify(spy, times(1)).cancel(any());
closeShards(indexShard);
}
}