[segment replication]Introducing common Replication interfaces for segment replication and recovery code paths (#3234)

* RecoveryState inherits from ReplicationState + RecoveryTarget inherits from ReplicationTarget

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* Refactoring: mixedClusterVersion error fix + move Stage to ReplicationState

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* pull ReplicationListener into a top level class + add javadocs + address review comments

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* fix javadoc

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* review changes

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* Refactoring the hierarchy relationship between repl and recovery

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* style fix

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* move package common under replication

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* rename to replication

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* rename and doc changes

Signed-off-by: Poojita Raj <poojiraj@amazon.com>
This commit is contained in:
Poojita Raj 2022-05-23 12:19:54 -07:00 committed by GitHub
parent 55ca331035
commit a023ad9cba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 751 additions and 583 deletions

View File

@ -101,8 +101,8 @@ import org.opensearch.index.store.StoreStats;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.indices.analysis.AnalysisModule;
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.node.NodeClosedException;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.AnalysisPlugin;

View File

@ -157,6 +157,7 @@ import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.repositories.RepositoriesService;
@ -2876,7 +2877,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public void startRecovery(
RecoveryState recoveryState,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Consumer<MappingMetadata> mappingUpdateConsumer,
IndicesService indicesService
@ -2909,7 +2910,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
} catch (Exception e) {
failShard("corrupted preexisting index", e);
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
}
break;
case SNAPSHOT:
@ -2984,15 +2985,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private void executeRecovery(
String reason,
RecoveryState recoveryState,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RecoveryListener recoveryListener,
CheckedConsumer<ActionListener<Boolean>, Exception> action
) {
markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onRecoveryDone(recoveryState);
recoveryListener.onDone(recoveryState);
}
}, e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
}, e -> recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
}
/**

View File

@ -136,6 +136,7 @@ import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.node.Node;
import org.opensearch.plugins.IndexStorePlugin;
@ -839,7 +840,7 @@ public class IndicesService extends AbstractLifecycleComponent
public IndexShard createShard(
final ShardRouting shardRouting,
final PeerRecoveryTargetService recoveryTargetService,
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
final RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,
final Consumer<IndexShard.ShardFailure> onShardFailure,
final Consumer<ShardId> globalCheckpointSyncer,

View File

@ -78,8 +78,9 @@ import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.PeerRecoverySourceService;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
import org.opensearch.snapshots.SnapshotShardsService;
@ -624,7 +625,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
indicesService.createShard(
shardRouting,
recoveryTargetService,
new RecoveryListener(shardRouting, primaryTerm),
new RecoveryListener(shardRouting, primaryTerm, this),
repositoriesService,
failedShardHandler,
globalCheckpointSyncer,
@ -739,37 +740,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
return sourceNode;
}
private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
/**
* ShardRouting with which the shard was created
*/
private final ShardRouting shardRouting;
/**
* Primary term with which the shard was created
*/
private final long primaryTerm;
private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm) {
this.shardRouting = shardRouting;
this.primaryTerm = primaryTerm;
}
@Override
public void onRecoveryDone(final RecoveryState state) {
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
handleRecoveryFailure(shardRouting, sendShardFailure, e);
}
// package-private for testing
public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) {
failAndRemoveShard(shardRouting, sendShardFailure, "failed recovery", failure, clusterService.state());
}
// package-private for testing
synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) {
failAndRemoveShard(shardRouting, sendShardFailure, "failed recovery", failure, clusterService.state());
public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) {
RecoveryState RecState = (RecoveryState) state;
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}
private void failAndRemoveShard(
@ -1004,7 +982,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
T createShard(
ShardRouting shardRouting,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer,

View File

@ -37,10 +37,10 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.opensearch.ExceptionsHelper;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.support.ChannelActionListener;
@ -69,7 +69,8 @@ import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.tasks.Task;
@ -124,7 +125,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
private final RecoverySettings recoverySettings;
private final ClusterService clusterService;
private final RecoveriesCollection onGoingRecoveries;
private final ReplicationCollection<RecoveryTarget> onGoingRecoveries;
public PeerRecoveryTargetService(
ThreadPool threadPool,
@ -136,7 +137,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
this.onGoingRecoveries = new ReplicationCollection<>(logger, threadPool);
transportService.registerRequestHandler(
Actions.FILES_INFO,
@ -185,13 +186,16 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
if (indexShard != null) {
onGoingRecoveries.cancelRecoveriesForShard(shardId, "shard closed");
onGoingRecoveries.cancelForShard(shardId, "shard closed");
}
}
public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
// create a new recovery status, and process...
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
final long recoveryId = onGoingRecoveries.start(
new RecoveryTarget(indexShard, sourceNode, listener),
recoverySettings.activityTimeout()
);
// we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause
// assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool.
threadPool.generic().execute(new RecoveryRunner(recoveryId));
@ -208,9 +212,9 @@ public class PeerRecoveryTargetService implements IndexEventListener {
}
private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) {
RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout);
RecoveryTarget newTarget = onGoingRecoveries.reset(recoveryId, activityTimeout);
if (newTarget != null) {
threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.recoveryId()));
threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.getId()));
}
}
@ -225,7 +229,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
final TransportRequest requestToSend;
final StartRecoveryRequest startRequest;
final ReplicationTimer timer;
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.get(recoveryId)) {
if (recoveryRef == null) {
logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
return;
@ -248,7 +252,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
} catch (final Exception e) {
// this will be logged as warning later on...
logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
onGoingRecoveries.failRecovery(
onGoingRecoveries.fail(
recoveryId,
new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e),
true
@ -339,28 +343,17 @@ public class PeerRecoveryTargetService implements IndexEventListener {
localNode,
metadataSnapshot,
recoveryTarget.state().getPrimary(),
recoveryTarget.recoveryId(),
recoveryTarget.getId(),
startingSeqNo
);
return request;
}
/**
* The recovery listener
*
* @opensearch.internal
*/
public interface RecoveryListener {
void onRecoveryDone(RecoveryState state);
void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure);
}
class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.PREPARE_TRANSLOG, request);
if (listener == null) {
return;
@ -375,7 +368,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FINALIZE, request);
if (listener == null) {
return;
@ -391,7 +384,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel, Task task)
throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
recoveryRef.get().handoffPrimaryContext(request.primaryContext());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
@ -404,7 +397,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel, Task task)
throws IOException {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = createOrFinishListener(
recoveryRef,
@ -424,7 +417,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
private void performTranslogOps(
final RecoveryTranslogOperationsRequest request,
final ActionListener<Void> listener,
final RecoveryRef recoveryRef
final ReplicationRef<RecoveryTarget> recoveryRef
) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
@ -439,7 +432,12 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void onNewClusterState(ClusterState state) {
threadPool.generic().execute(ActionRunnable.wrap(listener, l -> {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
try (
ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(
request.recoveryId(),
request.shardId()
)
) {
performTranslogOps(request, listener, recoveryRef);
}
}));
@ -485,7 +483,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FILES_INFO, request);
if (listener == null) {
return;
@ -508,7 +506,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.CLEAN_FILES, request);
if (listener == null) {
return;
@ -527,7 +525,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FILE_CHUNK, request);
if (listener == null) {
@ -563,7 +561,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
}
private ActionListener<Void> createOrFinishListener(
final RecoveryRef recoveryRef,
final ReplicationRef<RecoveryTarget> recoveryRef,
final TransportChannel channel,
final String action,
final RecoveryTransportRequest request
@ -572,7 +570,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
}
private ActionListener<Void> createOrFinishListener(
final RecoveryRef recoveryRef,
final ReplicationRef<RecoveryTarget> recoveryRef,
final TransportChannel channel,
final String action,
final RecoveryTransportRequest request,
@ -609,10 +607,10 @@ public class PeerRecoveryTargetService implements IndexEventListener {
@Override
public void onFailure(Exception e) {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.get(recoveryId)) {
if (recoveryRef != null) {
logger.error(() -> new ParameterizedMessage("unexpected error during recovery [{}], failing shard", recoveryId), e);
onGoingRecoveries.failRecovery(
onGoingRecoveries.fail(
recoveryId,
new RecoveryFailedException(recoveryRef.get().state(), "unexpected error", e),
true // be safe
@ -648,7 +646,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
public void handleResponse(RecoveryResponse recoveryResponse) {
final TimeValue recoveryTime = new TimeValue(timer.time());
// do this through ongoing recoveries to remove it from the collection
onGoingRecoveries.markRecoveryAsDone(recoveryId);
onGoingRecoveries.markAsDone(recoveryId);
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[')
@ -709,11 +707,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof CancellableThreads.ExecutionCancelledException) {
// this can also come from the source wrapped in a RemoteTransportException
onGoingRecoveries.failRecovery(
recoveryId,
new RecoveryFailedException(request, "source has canceled the recovery", cause),
false
);
onGoingRecoveries.fail(recoveryId, new RecoveryFailedException(request, "source has canceled the recovery", cause), false);
return;
}
if (cause instanceof RecoveryEngineException) {
@ -766,11 +760,11 @@ public class PeerRecoveryTargetService implements IndexEventListener {
}
if (cause instanceof AlreadyClosedException) {
onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, "source shard is closed", cause), false);
onGoingRecoveries.fail(recoveryId, new RecoveryFailedException(request, "source shard is closed", cause), false);
return;
}
onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true);
onGoingRecoveries.fail(recoveryId, new RecoveryFailedException(request, e), true);
}
@Override

View File

@ -1,332 +0,0 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.indices.recovery;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.concurrent.AutoCloseableRefCounted;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.ShardId;
import org.opensearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
/**
* This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node
* of those recoveries). The class is used to guarantee concurrent semantics such that once a recoveries was done/cancelled/failed
* no other thread will be able to find it. Last, the {@link RecoveryRef} inner class verifies that recovery temporary files
* and store will only be cleared once on going usage is finished.
*
* @opensearch.internal
*/
public class RecoveriesCollection {
/** This is the single source of truth for ongoing recoveries. If it's not here, it was canceled or done */
private final ConcurrentMap<Long, RecoveryTarget> onGoingRecoveries = ConcurrentCollections.newConcurrentMap();
private final Logger logger;
private final ThreadPool threadPool;
public RecoveriesCollection(Logger logger, ThreadPool threadPool) {
this.logger = logger;
this.threadPool = threadPool;
}
/**
* Starts are new recovery for the given shard, source node and state
*
* @return the id of the new recovery.
*/
public long startRecovery(
IndexShard indexShard,
DiscoveryNode sourceNode,
PeerRecoveryTargetService.RecoveryListener listener,
TimeValue activityTimeout
) {
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener);
startRecoveryInternal(recoveryTarget, activityTimeout);
return recoveryTarget.recoveryId();
}
private void startRecoveryInternal(RecoveryTarget recoveryTarget, TimeValue activityTimeout) {
RecoveryTarget existingTarget = onGoingRecoveries.putIfAbsent(recoveryTarget.recoveryId(), recoveryTarget);
assert existingTarget == null : "found two RecoveryStatus instances with the same id";
logger.trace(
"{} started recovery from {}, id [{}]",
recoveryTarget.shardId(),
recoveryTarget.sourceNode(),
recoveryTarget.recoveryId()
);
threadPool.schedule(
new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout),
activityTimeout,
ThreadPool.Names.GENERIC
);
}
/**
* Resets the recovery and performs a recovery restart on the currently recovering index shard
*
* @see IndexShard#performRecoveryRestart()
* @return newly created RecoveryTarget
*/
public RecoveryTarget resetRecovery(final long recoveryId, final TimeValue activityTimeout) {
RecoveryTarget oldRecoveryTarget = null;
final RecoveryTarget newRecoveryTarget;
try {
synchronized (onGoingRecoveries) {
// swap recovery targets in a synchronized block to ensure that the newly added recovery target is picked up by
// cancelRecoveriesForShard whenever the old recovery target is picked up
oldRecoveryTarget = onGoingRecoveries.remove(recoveryId);
if (oldRecoveryTarget == null) {
return null;
}
newRecoveryTarget = oldRecoveryTarget.retryCopy();
startRecoveryInternal(newRecoveryTarget, activityTimeout);
}
// Closes the current recovery target
boolean successfulReset = oldRecoveryTarget.resetRecovery(newRecoveryTarget.cancellableThreads());
if (successfulReset) {
logger.trace(
"{} restarted recovery from {}, id [{}], previous id [{}]",
newRecoveryTarget.shardId(),
newRecoveryTarget.sourceNode(),
newRecoveryTarget.recoveryId(),
oldRecoveryTarget.recoveryId()
);
return newRecoveryTarget;
} else {
logger.trace(
"{} recovery could not be reset as it is already cancelled, recovery from {}, id [{}], previous id [{}]",
newRecoveryTarget.shardId(),
newRecoveryTarget.sourceNode(),
newRecoveryTarget.recoveryId(),
oldRecoveryTarget.recoveryId()
);
cancelRecovery(newRecoveryTarget.recoveryId(), "recovery cancelled during reset");
return null;
}
} catch (Exception e) {
// fail shard to be safe
oldRecoveryTarget.notifyListener(new RecoveryFailedException(oldRecoveryTarget.state(), "failed to retry recovery", e), true);
return null;
}
}
public RecoveryTarget getRecoveryTarget(long id) {
return onGoingRecoveries.get(id);
}
/**
* gets the {@link RecoveryTarget } for a given id. The RecoveryStatus returned has it's ref count already incremented
* to make sure it's safe to use. However, you must call {@link RecoveryTarget#decRef()} when you are done with it, typically
* by using this method in a try-with-resources clause.
* <p>
* Returns null if recovery is not found
*/
public RecoveryRef getRecovery(long id) {
RecoveryTarget status = onGoingRecoveries.get(id);
if (status != null && status.tryIncRef()) {
return new RecoveryRef(status);
}
return null;
}
/** Similar to {@link #getRecovery(long)} but throws an exception if no recovery is found */
public RecoveryRef getRecoverySafe(long id, ShardId shardId) {
RecoveryRef recoveryRef = getRecovery(id);
if (recoveryRef == null) {
throw new IndexShardClosedException(shardId);
}
assert recoveryRef.get().shardId().equals(shardId);
return recoveryRef;
}
/** cancel the recovery with the given id (if found) and remove it from the recovery collection */
public boolean cancelRecovery(long id, String reason) {
RecoveryTarget removed = onGoingRecoveries.remove(id);
boolean cancelled = false;
if (removed != null) {
logger.trace(
"{} canceled recovery from {}, id [{}] (reason [{}])",
removed.shardId(),
removed.sourceNode(),
removed.recoveryId(),
reason
);
removed.cancel(reason);
cancelled = true;
}
return cancelled;
}
/**
* fail the recovery with the given id (if found) and remove it from the recovery collection
*
* @param id id of the recovery to fail
* @param e exception with reason for the failure
* @param sendShardFailure true a shard failed message should be sent to the master
*/
public void failRecovery(long id, RecoveryFailedException e, boolean sendShardFailure) {
RecoveryTarget removed = onGoingRecoveries.remove(id);
if (removed != null) {
logger.trace(
"{} failing recovery from {}, id [{}]. Send shard failure: [{}]",
removed.shardId(),
removed.sourceNode(),
removed.recoveryId(),
sendShardFailure
);
removed.fail(e, sendShardFailure);
}
}
/** mark the recovery with the given id as done (if found) */
public void markRecoveryAsDone(long id) {
RecoveryTarget removed = onGoingRecoveries.remove(id);
if (removed != null) {
logger.trace("{} marking recovery from {} as done, id [{}]", removed.shardId(), removed.sourceNode(), removed.recoveryId());
removed.markAsDone();
}
}
/** the number of ongoing recoveries */
public int size() {
return onGoingRecoveries.size();
}
/**
* cancel all ongoing recoveries for the given shard
*
* @param reason reason for cancellation
* @param shardId shardId for which to cancel recoveries
* @return true if a recovery was cancelled
*/
public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
boolean cancelled = false;
List<RecoveryTarget> matchedRecoveries = new ArrayList<>();
synchronized (onGoingRecoveries) {
for (Iterator<RecoveryTarget> it = onGoingRecoveries.values().iterator(); it.hasNext();) {
RecoveryTarget status = it.next();
if (status.shardId().equals(shardId)) {
matchedRecoveries.add(status);
it.remove();
}
}
}
for (RecoveryTarget removed : matchedRecoveries) {
logger.trace(
"{} canceled recovery from {}, id [{}] (reason [{}])",
removed.shardId(),
removed.sourceNode(),
removed.recoveryId(),
reason
);
removed.cancel(reason);
cancelled = true;
}
return cancelled;
}
/**
* a reference to {@link RecoveryTarget}, which implements {@link AutoCloseable}. closing the reference
* causes {@link RecoveryTarget#decRef()} to be called. This makes sure that the underlying resources
* will not be freed until {@link RecoveryRef#close()} is called.
*
* @opensearch.internal
*/
public static class RecoveryRef extends AutoCloseableRefCounted<RecoveryTarget> {
/**
* Important: {@link RecoveryTarget#tryIncRef()} should
* be *successfully* called on status before
*/
public RecoveryRef(RecoveryTarget status) {
super(status);
status.setLastAccessTime();
}
}
private class RecoveryMonitor extends AbstractRunnable {
private final long recoveryId;
private final TimeValue checkInterval;
private volatile long lastSeenAccessTime;
private RecoveryMonitor(long recoveryId, long lastSeenAccessTime, TimeValue checkInterval) {
this.recoveryId = recoveryId;
this.checkInterval = checkInterval;
this.lastSeenAccessTime = lastSeenAccessTime;
}
@Override
public void onFailure(Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected error while monitoring recovery [{}]", recoveryId), e);
}
@Override
protected void doRun() throws Exception {
RecoveryTarget status = onGoingRecoveries.get(recoveryId);
if (status == null) {
logger.trace("[monitor] no status found for [{}], shutting down", recoveryId);
return;
}
long accessTime = status.lastAccessTime();
if (accessTime == lastSeenAccessTime) {
String message = "no activity after [" + checkInterval + "]";
failRecovery(
recoveryId,
new RecoveryFailedException(status.state(), message, new OpenSearchTimeoutException(message)),
true // to be safe, we don't know what go stuck
);
return;
}
lastSeenAccessTime = accessTime;
logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", recoveryId, lastSeenAccessTime);
threadPool.schedule(this, checkInterval, ThreadPool.Names.GENERIC);
}
}
}

View File

@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.recovery;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
/**
* Listener that runs on changes in Recovery state
*
* @opensearch.internal
*/
public class RecoveryListener implements ReplicationListener {
/**
* ShardRouting with which the shard was created
*/
private final ShardRouting shardRouting;
/**
* Primary term with which the shard was created
*/
private final long primaryTerm;
private final IndicesClusterStateService indicesClusterStateService;
public RecoveryListener(
final ShardRouting shardRouting,
final long primaryTerm,
IndicesClusterStateService indicesClusterStateService
) {
this.shardRouting = shardRouting;
this.primaryTerm = primaryTerm;
this.indicesClusterStateService = indicesClusterStateService;
}
@Override
public void onDone(ReplicationState state) {
indicesClusterStateService.handleRecoveryDone(state, shardRouting, primaryTerm);
}
@Override
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
indicesClusterStateService.handleRecoveryFailure(shardRouting, sendShardFailure, e);
}
}

View File

@ -45,6 +45,7 @@ import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTimer;
@ -56,7 +57,7 @@ import java.util.Locale;
*
* @opensearch.internal
*/
public class RecoveryState implements ToXContentFragment, Writeable {
public class RecoveryState implements ReplicationState, ToXContentFragment, Writeable {
/**
* The stage of the recovery state

View File

@ -32,22 +32,18 @@
package org.opensearch.indices.recovery;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.opensearch.Assertions;
import org.opensearch.OpenSearchException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.MapperException;
import org.opensearch.index.seqno.ReplicationTracker;
@ -56,48 +52,33 @@ import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardNotRecoveringException;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTarget;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationCollection;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
* this class are created through {@link RecoveriesCollection}.
* this class are created through {@link ReplicationCollection}.
*
* @opensearch.internal
*/
public class RecoveryTarget extends AbstractRefCounted implements RecoveryTargetHandler {
private final Logger logger;
private static final AtomicLong idGenerator = new AtomicLong();
public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetHandler {
private static final String RECOVERY_PREFIX = "recovery.";
private final ShardId shardId;
private final long recoveryId;
private final IndexShard indexShard;
private final DiscoveryNode sourceNode;
private final MultiFileWriter multiFileWriter;
private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker();
private final Store store;
private final PeerRecoveryTargetService.RecoveryListener listener;
private final AtomicBoolean finished = new AtomicBoolean();
private final CancellableThreads cancellableThreads;
// last time this status was accessed
private volatile long lastAccessTime = System.nanoTime();
protected final MultiFileWriter multiFileWriter;
protected final Store store;
// latch that can be used to blockingly wait for RecoveryTarget to be closed
private final CountDownLatch closedLatch = new CountDownLatch(1);
@ -109,27 +90,15 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
* @param sourceNode source node of the recovery where we recover from
* @param listener called when recovery is completed/failed
*/
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener) {
super("recovery_status");
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener) {
super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener);
this.cancellableThreads = new CancellableThreads();
this.recoveryId = idGenerator.incrementAndGet();
this.listener = listener;
this.logger = Loggers.getLogger(getClass(), indexShard.shardId());
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.shardId = indexShard.shardId();
final String tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(
indexShard.store(),
indexShard.recoveryState().getIndex(),
tempFilePrefix,
logger,
this::ensureRefCount
);
this.store = indexShard.store();
// make sure the store is not released until we are done.
store.incRef();
indexShard.recoveryStats().incCurrentAsTarget();
this.store = indexShard.store();
final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount);
store.incRef();
}
/**
@ -141,23 +110,15 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
return new RecoveryTarget(indexShard, sourceNode, listener);
}
public ActionListener<Void> markRequestReceivedAndCreateListener(long requestSeqNo, ActionListener<Void> listener) {
return requestTracker.markReceivedAndCreateListener(requestSeqNo, listener);
}
public long recoveryId() {
return recoveryId;
}
public ShardId shardId() {
return shardId;
}
public IndexShard indexShard() {
ensureRefCount();
return indexShard;
}
public String source() {
return sourceNode.toString();
}
public DiscoveryNode sourceNode() {
return this.sourceNode;
}
@ -170,29 +131,29 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
return cancellableThreads;
}
/** return the last time this RecoveryStatus was used (based on System.nanoTime() */
public long lastAccessTime() {
return lastAccessTime;
}
/** sets the lasAccessTime flag to now */
public void setLastAccessTime() {
lastAccessTime = System.nanoTime();
}
public Store store() {
ensureRefCount();
return store;
}
public String description() {
return "recovery from " + source();
}
@Override
public void notifyListener(Exception e, boolean sendShardFailure) {
listener.onFailure(state(), new RecoveryFailedException(state(), e.getMessage(), e), sendShardFailure);
}
/**
* Closes the current recovery target and waits up to a certain timeout for resources to be freed.
* Returns true if resetting the recovery was successful, false if the recovery target is already cancelled / failed or marked as done.
*/
boolean resetRecovery(CancellableThreads newTargetCancellableThreads) throws IOException {
public boolean reset(CancellableThreads newTargetCancellableThreads) throws IOException {
final long recoveryId = getId();
if (finished.compareAndSet(false, true)) {
try {
logger.debug("reset of recovery with shard {} and id [{}]", shardId, recoveryId);
logger.debug("reset of recovery with shard {} and id [{}]", shardId(), recoveryId);
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now.
decRef();
@ -202,7 +163,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
} catch (CancellableThreads.ExecutionCancelledException e) {
logger.trace(
"new recovery target cancelled for shard {} while waiting on old recovery target with id [{}] to close",
shardId,
shardId(),
recoveryId
);
return false;
@ -248,22 +209,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
* @param sendShardFailure indicates whether to notify the cluster-manager of the shard failure
*/
public void fail(RecoveryFailedException e, boolean sendShardFailure) {
if (finished.compareAndSet(false, true)) {
try {
notifyListener(e, sendShardFailure);
} finally {
try {
cancellableThreads.cancel("failed recovery [" + ExceptionsHelper.stackTrace(e) + "]");
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
}
}
}
public void notifyListener(RecoveryFailedException e, boolean sendShardFailure) {
listener.onRecoveryFailure(state(), e, sendShardFailure);
super.fail(e, sendShardFailure);
}
/** mark the current recovery as done */
@ -278,7 +224,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
listener.onRecoveryDone(state());
listener.onDone(state());
}
}
@ -287,7 +233,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
try {
multiFileWriter.close();
} finally {
// free store. increment happens in constructor
store.decRef();
indexShard.recoveryStats().decCurrentAsTarget();
closedLatch.countDown();
@ -296,15 +241,28 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
@Override
public String toString() {
return shardId + " [" + recoveryId + "]";
return shardId() + " [" + getId() + "]";
}
private void ensureRefCount() {
if (refCount() <= 0) {
throw new OpenSearchException(
"RecoveryStatus is used but it's refcount is 0. Probably a mismatch between incRef/decRef " + "calls"
);
}
@Override
protected String getPrefix() {
return RECOVERY_PREFIX;
}
@Override
protected void onDone() {
assert multiFileWriter.tempFileNames.isEmpty() : "not all temporary files are renamed";
// this might still throw an exception ie. if the shard is CLOSED due to some other event.
// it's safer to decrement the reference in a try finally here.
indexShard.postRecovery("peer recovery done");
}
/**
* if {@link #cancellableThreads()} was used, the threads will be interrupted.
*/
@Override
protected void onCancel(String reason) {
cancellableThreads.cancel(reason);
}
/*** Implementation of {@link RecoveryTargetHandler } */
@ -374,7 +332,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
throw new IndexShardNotRecoveringException(shardId(), indexShard().state());
}
/*
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
@ -460,7 +418,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
globalCheckpoint,
shardId,
shardId(),
indexShard.getPendingPrimaryTerm()
);
store.associateIndexWithNewTranslog(translogUUID);

View File

@ -0,0 +1,297 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.indices.replication.common;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.common.concurrent.AutoCloseableRefCounted;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.ShardId;
import org.opensearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
/**
* This class holds a collection of all on going replication events on the current node (i.e., the node is the target node
* of those events). The class is used to guarantee concurrent semantics such that once an event was done/cancelled/failed
* no other thread will be able to find it. Last, the {@link ReplicationRef} inner class verifies that temporary files
* and store will only be cleared once on going usage is finished.
*
* @opensearch.internal
*/
public class ReplicationCollection<T extends ReplicationTarget> {
/** This is the single source of truth for ongoing target events. If it's not here, it was canceled or done */
private final ConcurrentMap<Long, T> onGoingTargetEvents = ConcurrentCollections.newConcurrentMap();
private final Logger logger;
private final ThreadPool threadPool;
public ReplicationCollection(Logger logger, ThreadPool threadPool) {
this.logger = logger;
this.threadPool = threadPool;
}
/**
* Starts a new target event for the given shard, source node and state
*
* @return the id of the new target event.
*/
public long start(T target, TimeValue activityTimeout) {
startInternal(target, activityTimeout);
return target.getId();
}
private void startInternal(T target, TimeValue activityTimeout) {
T existingTarget = onGoingTargetEvents.putIfAbsent(target.getId(), target);
assert existingTarget == null : "found two Target instances with the same id";
logger.trace("started {}", target.description());
threadPool.schedule(
new ReplicationMonitor(target.getId(), target.lastAccessTime(), activityTimeout),
activityTimeout,
ThreadPool.Names.GENERIC
);
}
/**
* Resets the target event and performs a restart on the current index shard
*
* @see IndexShard#performRecoveryRestart()
* @return newly created Target
*/
@SuppressWarnings(value = "unchecked")
public T reset(final long id, final TimeValue activityTimeout) {
T oldTarget = null;
final T newTarget;
try {
synchronized (onGoingTargetEvents) {
// swap targets in a synchronized block to ensure that the newly added target is picked up by
// cancelForShard whenever the old target is picked up
oldTarget = onGoingTargetEvents.remove(id);
if (oldTarget == null) {
return null;
}
newTarget = (T) oldTarget.retryCopy();
startInternal(newTarget, activityTimeout);
}
// Closes the current target
boolean successfulReset = oldTarget.reset(newTarget.cancellableThreads());
if (successfulReset) {
logger.trace("restarted {}, previous id [{}]", newTarget.description(), oldTarget.getId());
return newTarget;
} else {
logger.trace(
"{} could not be reset as it is already cancelled, previous id [{}]",
newTarget.description(),
oldTarget.getId()
);
cancel(newTarget.getId(), "cancelled during reset");
return null;
}
} catch (Exception e) {
// fail shard to be safe
assert oldTarget != null;
oldTarget.notifyListener(e, true);
return null;
}
}
public T getTarget(long id) {
return onGoingTargetEvents.get(id);
}
/**
* gets the {@link ReplicationTarget } for a given id. The ShardTarget returned has it's ref count already incremented
* to make sure it's safe to use. However, you must call {@link ReplicationTarget#decRef()} when you are done with it, typically
* by using this method in a try-with-resources clause.
* <p>
* Returns null if target event is not found
*/
public ReplicationRef<T> get(long id) {
T status = onGoingTargetEvents.get(id);
if (status != null && status.tryIncRef()) {
return new ReplicationRef<T>(status);
}
return null;
}
/** Similar to {@link #get(long)} but throws an exception if no target is found */
public ReplicationRef<T> getSafe(long id, ShardId shardId) {
ReplicationRef<T> ref = get(id);
if (ref == null) {
throw new IndexShardClosedException(shardId);
}
assert ref.get().indexShard().shardId().equals(shardId);
return ref;
}
/** cancel the target with the given id (if found) and remove it from the target collection */
public boolean cancel(long id, String reason) {
T removed = onGoingTargetEvents.remove(id);
boolean cancelled = false;
if (removed != null) {
logger.trace("canceled {} (reason [{}])", removed.description(), reason);
removed.cancel(reason);
cancelled = true;
}
return cancelled;
}
/**
* fail the target with the given id (if found) and remove it from the target collection
*
* @param id id of the target to fail
* @param e exception with reason for the failure
* @param sendShardFailure true a shard failed message should be sent to the master
*/
public void fail(long id, OpenSearchException e, boolean sendShardFailure) {
T removed = onGoingTargetEvents.remove(id);
if (removed != null) {
logger.trace("failing {}. Send shard failure: [{}]", removed.description(), sendShardFailure);
removed.fail(e, sendShardFailure);
}
}
/** mark the target with the given id as done (if found) */
public void markAsDone(long id) {
T removed = onGoingTargetEvents.remove(id);
if (removed != null) {
logger.trace("Marking {} as done", removed.description());
removed.markAsDone();
}
}
/** the number of ongoing target events */
public int size() {
return onGoingTargetEvents.size();
}
/**
* cancel all ongoing targets for the given shard
*
* @param reason reason for cancellation
* @param shardId shardId for which to cancel targets
* @return true if a target was cancelled
*/
public boolean cancelForShard(ShardId shardId, String reason) {
boolean cancelled = false;
List<T> matchedTargets = new ArrayList<>();
synchronized (onGoingTargetEvents) {
for (Iterator<T> it = onGoingTargetEvents.values().iterator(); it.hasNext();) {
T status = it.next();
if (status.indexShard().shardId().equals(shardId)) {
matchedTargets.add(status);
it.remove();
}
}
}
for (T removed : matchedTargets) {
logger.trace("canceled {} (reason [{}])", removed.description(), reason);
removed.cancel(reason);
cancelled = true;
}
return cancelled;
}
/**
* a reference to {@link ReplicationTarget}, which implements {@link AutoCloseable}. closing the reference
* causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources
* will not be freed until {@link ReplicationRef#close()} is called.
*
* @opensearch.internal
*/
public static class ReplicationRef<T extends ReplicationTarget> extends AutoCloseableRefCounted<T> {
/**
* Important: {@link ReplicationTarget#tryIncRef()} should
* be *successfully* called on status before
*/
public ReplicationRef(T status) {
super(status);
status.setLastAccessTime();
}
}
private class ReplicationMonitor extends AbstractRunnable {
private final long id;
private final TimeValue checkInterval;
private volatile long lastSeenAccessTime;
private ReplicationMonitor(long id, long lastSeenAccessTime, TimeValue checkInterval) {
this.id = id;
this.checkInterval = checkInterval;
this.lastSeenAccessTime = lastSeenAccessTime;
}
@Override
public void onFailure(Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected error while monitoring [{}]", id), e);
}
@Override
protected void doRun() throws Exception {
T status = onGoingTargetEvents.get(id);
if (status == null) {
logger.trace("[monitor] no status found for [{}], shutting down", id);
return;
}
long accessTime = status.lastAccessTime();
if (accessTime == lastSeenAccessTime) {
String message = "no activity after [" + checkInterval + "]";
fail(
id,
new OpenSearchTimeoutException(message),
true // to be safe, we don't know what go stuck
);
return;
}
lastSeenAccessTime = accessTime;
logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", id, lastSeenAccessTime);
threadPool.schedule(this, checkInterval, ThreadPool.Names.GENERIC);
}
}
}

View File

@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication.common;
import org.opensearch.OpenSearchException;
/**
* Interface for listeners that run when there's a change in {@link ReplicationState}
*
* @opensearch.internal
*/
public interface ReplicationListener {
void onDone(ReplicationState state);
void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure);
}

View File

@ -30,7 +30,7 @@
* GitHub history for details.
*/
package org.opensearch.indices.recovery;
package org.opensearch.indices.replication.common;
import org.opensearch.action.ActionListener;
import org.opensearch.common.Nullable;
@ -45,11 +45,11 @@ import java.util.Map;
import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
/**
* Tracks recovery requests
* Tracks replication requests
*
* @opensearch.internal
*/
public class RecoveryRequestTracker {
public class ReplicationRequestTracker {
private final Map<Long, ListenableFuture<Void>> ongoingRequests = Collections.synchronizedMap(new HashMap<>());
private final LocalCheckpointTracker checkpointTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);

View File

@ -0,0 +1,18 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication.common;
/**
* Represents a state object used to track copying of segments from an external source
*
* @opensearch.internal
*/
public interface ReplicationState {
}

View File

@ -0,0 +1,175 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication.common;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Represents the target of a replication operation performed on a shard
*
* @opensearch.internal
*/
public abstract class ReplicationTarget extends AbstractRefCounted {
private static final AtomicLong ID_GENERATOR = new AtomicLong();
// last time the target/status was accessed
private volatile long lastAccessTime = System.nanoTime();
private final ReplicationRequestTracker requestTracker = new ReplicationRequestTracker();
private final long id;
protected final AtomicBoolean finished = new AtomicBoolean();
private final ShardId shardId;
protected final IndexShard indexShard;
protected final ReplicationListener listener;
protected final Logger logger;
protected final CancellableThreads cancellableThreads;
protected final ReplicationLuceneIndex stateIndex;
protected abstract String getPrefix();
protected abstract void onDone();
protected abstract void onCancel(String reason);
public abstract ReplicationState state();
public abstract ReplicationTarget retryCopy();
public abstract String description();
public ReplicationListener getListener() {
return listener;
}
public CancellableThreads cancellableThreads() {
return cancellableThreads;
}
public abstract void notifyListener(Exception e, boolean sendShardFailure);
public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex stateIndex, ReplicationListener listener) {
super(name);
this.logger = Loggers.getLogger(getClass(), indexShard.shardId());
this.listener = listener;
this.id = ID_GENERATOR.incrementAndGet();
this.stateIndex = stateIndex;
this.indexShard = indexShard;
this.shardId = indexShard.shardId();
// make sure the store is not released until we are done.
this.cancellableThreads = new CancellableThreads();
}
public long getId() {
return id;
}
public abstract boolean reset(CancellableThreads newTargetCancellableThreads) throws IOException;
/**
* return the last time this ReplicationStatus was used (based on System.nanoTime()
*/
public long lastAccessTime() {
return lastAccessTime;
}
/**
* sets the lasAccessTime flag to now
*/
public void setLastAccessTime() {
lastAccessTime = System.nanoTime();
}
public ActionListener<Void> markRequestReceivedAndCreateListener(long requestSeqNo, ActionListener<Void> listener) {
return requestTracker.markReceivedAndCreateListener(requestSeqNo, listener);
}
public IndexShard indexShard() {
ensureRefCount();
return indexShard;
}
public ShardId shardId() {
return shardId;
}
/**
* mark the current replication as done
*/
public void markAsDone() {
if (finished.compareAndSet(false, true)) {
try {
onDone();
} finally {
// release the initial reference. replication files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
listener.onDone(state());
}
}
/**
* cancel the replication. calling this method will clean temporary files and release the store
* unless this object is in use (in which case it will be cleaned once all ongoing users call
* {@link #decRef()}
*/
public void cancel(String reason) {
if (finished.compareAndSet(false, true)) {
try {
logger.debug("replication cancelled (reason: [{}])", reason);
onCancel(reason);
} finally {
// release the initial reference. replication files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
}
}
/**
* fail the replication and call listener
*
* @param e exception that encapsulates the failure
* @param sendShardFailure indicates whether to notify the master of the shard failure
*/
public void fail(OpenSearchException e, boolean sendShardFailure) {
if (finished.compareAndSet(false, true)) {
try {
notifyListener(e, sendShardFailure);
} finally {
try {
cancellableThreads.cancel("failed" + description() + "[" + ExceptionsHelper.stackTrace(e) + "]");
} finally {
// release the initial reference. replication files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
}
}
}
protected void ensureRefCount() {
if (refCount() <= 0) {
throw new OpenSearchException(
"ReplicationTarget is used but it's refcount is 0. Probably a mismatch between incRef/decRef calls"
);
}
}
}

View File

@ -69,9 +69,9 @@ import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.shard.PrimaryReplicaSyncer;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.common.ReplicationListener;
import java.io.IOException;
import java.util.ArrayList;
@ -809,7 +809,7 @@ public class RecoveryDuringReplicationTests extends OpenSearchIndexLevelReplicat
CountDownLatch releaseRecovery,
IndexShard shard,
DiscoveryNode sourceNode,
PeerRecoveryTargetService.RecoveryListener listener,
ReplicationListener listener,
Logger logger
) {
super(shard, sourceNode, listener);

View File

@ -32,6 +32,7 @@
package org.opensearch.indices.cluster;
import org.junit.Before;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
@ -56,10 +57,10 @@ import org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndex;
import org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices;
import org.opensearch.indices.cluster.IndicesClusterStateService.Shard;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.HashMap;
@ -73,9 +74,9 @@ import java.util.function.Consumer;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.common.collect.MapBuilder.newMapBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.opensearch.common.collect.MapBuilder.newMapBuilder;
/**
* Abstract base class for tests against {@link IndicesClusterStateService}
@ -253,7 +254,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends OpenSea
public MockIndexShard createShard(
final ShardRouting shardRouting,
final PeerRecoveryTargetService recoveryTargetService,
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
final RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,
final Consumer<IndexShard.ShardFailure> onShardFailure,
final Consumer<ShardId> globalCheckpointSyncer,

View File

@ -41,6 +41,7 @@ import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.bulk.BulkShardRequest;
@ -68,6 +69,8 @@ import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.SnapshotMatchers;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import java.io.IOException;
import java.util.HashMap;
@ -448,20 +451,17 @@ public class RecoveryTests extends OpenSearchIndexLevelReplicationTestCase {
IndexShard replica = group.addReplica();
expectThrows(
Exception.class,
() -> group.recoverReplica(
replica,
(shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, new PeerRecoveryTargetService.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
throw new AssertionError("recovery must fail");
}
() -> group.recoverReplica(replica, (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, new ReplicationListener() {
@Override
public void onDone(ReplicationState state) {
throw new AssertionError("recovery must fail");
}
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("simulated"));
}
})
)
@Override
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("simulated"));
}
}))
);
expectThrows(AlreadyClosedException.class, () -> replica.refresh("test"));
group.removeReplica(replica);

View File

@ -36,6 +36,7 @@ import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.indices.replication.common.ReplicationRequestTracker;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
@ -44,7 +45,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
public class RecoveryRequestTrackerTests extends OpenSearchTestCase {
public class ReplicationRequestTrackerTests extends OpenSearchTestCase {
private TestThreadPool threadPool;
@ -64,7 +65,7 @@ public class RecoveryRequestTrackerTests extends OpenSearchTestCase {
Set<Long> seqNosReturned = ConcurrentCollections.newConcurrentSet();
ConcurrentMap<Long, Set<PlainActionFuture<Void>>> seqToResult = ConcurrentCollections.newConcurrentMap();
RecoveryRequestTracker requestTracker = new RecoveryRequestTracker();
ReplicationRequestTracker requestTracker = new ReplicationRequestTracker();
int numberOfRequests = randomIntBetween(100, 200);
for (int j = 0; j < numberOfRequests; ++j) {

View File

@ -38,10 +38,10 @@ import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.indices.recovery.RecoveriesCollection;
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryTarget;
import java.util.concurrent.CountDownLatch;
@ -51,64 +51,58 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
public class RecoveriesCollectionTests extends OpenSearchIndexLevelReplicationTestCase {
static final PeerRecoveryTargetService.RecoveryListener listener = new PeerRecoveryTargetService.RecoveryListener() {
public class ReplicationCollectionTests extends OpenSearchIndexLevelReplicationTestCase {
static final ReplicationListener listener = new ReplicationListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
public void onDone(ReplicationState state) {
}
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
}
};
public void testLastAccessTimeUpdate() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final ReplicationCollection<RecoveryTarget> collection = new ReplicationCollection<>(logger, threadPool);
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) {
try (ReplicationCollection.ReplicationRef<RecoveryTarget> status = collection.get(recoveryId)) {
final long lastSeenTime = status.get().lastAccessTime();
assertBusy(() -> {
try (RecoveriesCollection.RecoveryRef currentStatus = collection.getRecovery(recoveryId)) {
try (ReplicationCollection.ReplicationRef<RecoveryTarget> currentStatus = collection.get(recoveryId)) {
assertThat("access time failed to update", lastSeenTime, lessThan(currentStatus.get().lastAccessTime()));
}
});
} finally {
collection.cancelRecovery(recoveryId, "life");
collection.cancel(recoveryId, "life");
}
}
}
public void testRecoveryTimeout() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final ReplicationCollection<RecoveryTarget> collection = new ReplicationCollection<>(logger, threadPool);
final AtomicBoolean failed = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
final long recoveryId = startRecovery(
collection,
shards.getPrimaryNode(),
shards.addReplica(),
new PeerRecoveryTargetService.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
latch.countDown();
}
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica(), new ReplicationListener() {
@Override
public void onDone(ReplicationState state) {
latch.countDown();
}
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
failed.set(true);
latch.countDown();
}
},
TimeValue.timeValueMillis(100)
);
@Override
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
failed.set(true);
latch.countDown();
}
}, TimeValue.timeValueMillis(100));
try {
latch.await(30, TimeUnit.SECONDS);
assertTrue("recovery failed to timeout", failed.get());
} finally {
collection.cancelRecovery(recoveryId, "meh");
collection.cancel(recoveryId, "meh");
}
}
@ -116,16 +110,16 @@ public class RecoveriesCollectionTests extends OpenSearchIndexLevelReplicationTe
public void testRecoveryCancellation() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final ReplicationCollection<RecoveryTarget> collection = new ReplicationCollection<>(logger, threadPool);
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) {
ShardId shardId = recoveryRef.get().shardId();
assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test"));
try (ReplicationCollection.ReplicationRef<RecoveryTarget> recoveryRef = collection.get(recoveryId)) {
ShardId shardId = recoveryRef.get().indexShard().shardId();
assertTrue("failed to cancel recoveries", collection.cancelForShard(shardId, "test"));
assertThat("all recoveries should be cancelled", collection.size(), equalTo(0));
} finally {
collection.cancelRecovery(recoveryId, "meh");
collection.cancelRecovery(recoveryId2, "meh");
collection.cancel(recoveryId, "meh");
collection.cancel(recoveryId2, "meh");
}
}
}
@ -135,17 +129,17 @@ public class RecoveriesCollectionTests extends OpenSearchIndexLevelReplicationTe
shards.startAll();
int numDocs = randomIntBetween(1, 15);
shards.indexDocs(numDocs);
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final ReplicationCollection<RecoveryTarget> collection = new ReplicationCollection<>(logger, threadPool);
IndexShard shard = shards.addReplica();
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shard);
RecoveryTarget recoveryTarget = collection.getRecoveryTarget(recoveryId);
RecoveryTarget recoveryTarget = collection.getTarget(recoveryId);
final int currentAsTarget = shard.recoveryStats().currentAsTarget();
final int referencesToStore = recoveryTarget.store().refCount();
IndexShard indexShard = recoveryTarget.indexShard();
Store store = recoveryTarget.store();
String tempFileName = recoveryTarget.getTempNameForFile("foobar");
RecoveryTarget resetRecovery = collection.resetRecovery(recoveryId, TimeValue.timeValueMinutes(60));
final long resetRecoveryId = resetRecovery.recoveryId();
RecoveryTarget resetRecovery = collection.reset(recoveryId, TimeValue.timeValueMinutes(60));
final long resetRecoveryId = resetRecovery.getId();
assertNotSame(recoveryTarget, resetRecovery);
assertNotSame(recoveryTarget.cancellableThreads(), resetRecovery.cancellableThreads());
assertSame(indexShard, resetRecovery.indexShard());
@ -158,31 +152,31 @@ public class RecoveriesCollectionTests extends OpenSearchIndexLevelReplicationTe
String resetTempFileName = resetRecovery.getTempNameForFile("foobar");
assertNotEquals(tempFileName, resetTempFileName);
assertEquals(currentAsTarget, shard.recoveryStats().currentAsTarget());
try (RecoveriesCollection.RecoveryRef newRecoveryRef = collection.getRecovery(resetRecoveryId)) {
try (ReplicationCollection.ReplicationRef<RecoveryTarget> newRecoveryRef = collection.get(resetRecoveryId)) {
shards.recoverReplica(shard, (s, n) -> {
assertSame(s, newRecoveryRef.get().indexShard());
return newRecoveryRef.get();
}, false);
}
shards.assertAllEqual(numDocs);
assertNull("recovery is done", collection.getRecovery(recoveryId));
assertNull("recovery is done", collection.get(recoveryId));
}
}
long startRecovery(RecoveriesCollection collection, DiscoveryNode sourceNode, IndexShard shard) {
long startRecovery(ReplicationCollection<RecoveryTarget> collection, DiscoveryNode sourceNode, IndexShard shard) {
return startRecovery(collection, sourceNode, shard, listener, TimeValue.timeValueMinutes(60));
}
long startRecovery(
RecoveriesCollection collection,
ReplicationCollection<RecoveryTarget> collection,
DiscoveryNode sourceNode,
IndexShard indexShard,
PeerRecoveryTargetService.RecoveryListener listener,
ReplicationListener listener,
TimeValue timeValue
) {
final DiscoveryNode rNode = getDiscoveryNode(indexShard.routingEntry().currentNodeId());
indexShard.markAsRecovering("remote", new RecoveryState(indexShard.routingEntry(), sourceNode, rNode));
indexShard.prepareForIndexRecovery();
return collection.startRecovery(indexShard, sourceNode, listener, timeValue);
return collection.start(new RecoveryTarget(indexShard, sourceNode, listener), timeValue);
}
}

View File

@ -34,6 +34,7 @@ package org.opensearch.index.shard;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.Directory;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.index.IndexRequest;
@ -93,6 +94,8 @@ import org.opensearch.indices.recovery.RecoverySourceHandler;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.recovery.StartRecoveryRequest;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.OpenSearchBlobStoreRepositoryIntegTestCase;
@ -138,14 +141,14 @@ public abstract class IndexShardTestCase extends OpenSearchTestCase {
}
};
protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() {
protected static final ReplicationListener recoveryListener = new ReplicationListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
public void onDone(ReplicationState state) {
}
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
throw new AssertionError(e);
}
};