Remove wait for cluster state step in peer recovery (#40004)

We introduced WAIT_CLUSTERSTATE action in #19287 (5.0), but then stopped
using it since #25692 (6.0). This change removes that action and related
code in 7.x and 8.0.

Relates #19287
Relates #25692
This commit is contained in:
Nhat Nguyen 2019-03-18 13:26:51 -04:00
parent 9ba0bdf528
commit 38e9522218
16 changed files with 37 additions and 151 deletions

View File

@ -31,7 +31,6 @@ import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -57,7 +56,6 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
@ -93,7 +91,6 @@ public class PeerRecoveryTargetService implements IndexEventListener {
public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops";
public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
public static final String FINALIZE = "internal:index/shard/recovery/finalize";
public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate";
public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context";
}
@ -112,7 +109,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool, this::waitForClusterState);
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
transportService.registerRequestHandler(Actions.FILES_INFO, RecoveryFilesInfoRequest::new, ThreadPool.Names.GENERIC, new
FilesInfoRequestHandler());
@ -126,8 +123,6 @@ public class PeerRecoveryTargetService implements IndexEventListener {
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
FinalizeRecoveryRequestHandler());
transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new,
ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler());
transportService.registerRequestHandler(
Actions.HANDOFF_PRIMARY_CONTEXT,
RecoveryHandoffPrimaryContextRequest::new,
@ -452,18 +447,6 @@ public class PeerRecoveryTargetService implements IndexEventListener {
}
}
class WaitForClusterStateRequestHandler implements TransportRequestHandler<RecoveryWaitForClusterStateRequest> {
@Override
public void messageReceived(RecoveryWaitForClusterStateRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().ensureClusterStateVersion(request.clusterStateVersion());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
class HandoffPrimaryContextRequestHandler implements TransportRequestHandler<RecoveryHandoffPrimaryContextRequest> {
@Override
@ -538,46 +521,6 @@ public class PeerRecoveryTargetService implements IndexEventListener {
}
}
private void waitForClusterState(long clusterStateVersion) {
final ClusterState clusterState = clusterService.state();
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, TimeValue.timeValueMinutes(5), logger,
threadPool.getThreadContext());
if (clusterState.getVersion() >= clusterStateVersion) {
logger.trace("node has cluster state with version higher than {} (current: {})", clusterStateVersion,
clusterState.getVersion());
return;
} else {
logger.trace("waiting for cluster state version {} (current: {})", clusterStateVersion, clusterState.getVersion());
final PlainActionFuture<Long> future = new PlainActionFuture<>();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
future.onResponse(state.getVersion());
}
@Override
public void onClusterServiceClose() {
future.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
future.onFailure(new IllegalStateException("cluster state never updated to version " + clusterStateVersion));
}
}, newState -> newState.getVersion() >= clusterStateVersion);
try {
long currentVersion = future.get();
logger.trace("successfully waited for cluster state with version {} (current: {})", clusterStateVersion, currentVersion);
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage(
"failed waiting for cluster state with version {} (current: {})",
clusterStateVersion, clusterService.state().getVersion()), e);
throw ExceptionsHelper.convertToRuntime(e);
}
}
}
class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesInfoRequest> {
@Override

View File

@ -36,7 +36,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
/**
* This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node
@ -51,12 +50,10 @@ public class RecoveriesCollection {
private final Logger logger;
private final ThreadPool threadPool;
private final LongConsumer ensureClusterStateVersionCallback;
public RecoveriesCollection(Logger logger, ThreadPool threadPool, LongConsumer ensureClusterStateVersionCallback) {
public RecoveriesCollection(Logger logger, ThreadPool threadPool) {
this.logger = logger;
this.threadPool = threadPool;
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;
}
/**
@ -66,7 +63,7 @@ public class RecoveriesCollection {
*/
public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode,
PeerRecoveryTargetService.RecoveryListener listener, TimeValue activityTimeout) {
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback);
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener);
startRecoveryInternal(recoveryTarget, activityTimeout);
return recoveryTarget.recoveryId();
}

View File

@ -54,7 +54,6 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
/**
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
@ -75,7 +74,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final MultiFileWriter multiFileWriter;
private final Store store;
private final PeerRecoveryTargetService.RecoveryListener listener;
private final LongConsumer ensureClusterStateVersionCallback;
private final AtomicBoolean finished = new AtomicBoolean();
@ -93,14 +91,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
* @param indexShard local shard where we want to recover to
* @param sourceNode source node of the recovery where we recover from
* @param listener called when recovery is completed/failed
* @param ensureClusterStateVersionCallback callback to ensure that the current node is at least on a cluster state with the provided
* version; necessary for primary relocation so that new primary knows about all other ongoing
* replica recoveries when replicating documents (see {@link RecoverySourceHandler})
*/
public RecoveryTarget(final IndexShard indexShard,
final DiscoveryNode sourceNode,
final PeerRecoveryTargetService.RecoveryListener listener,
final LongConsumer ensureClusterStateVersionCallback) {
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener) {
super("recovery_status");
this.cancellableThreads = new CancellableThreads();
this.recoveryId = idGenerator.incrementAndGet();
@ -113,7 +105,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
this.multiFileWriter = new MultiFileWriter(indexShard.store(), indexShard.recoveryState().getIndex(), tempFilePrefix, logger,
this::ensureRefCount);
this.store = indexShard.store();
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;
// make sure the store is not released until we are done.
store.incRef();
indexShard.recoveryStats().incCurrentAsTarget();
@ -125,7 +116,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
* @return a copy of this recovery target
*/
public RecoveryTarget retryCopy() {
return new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback);
return new RecoveryTarget(indexShard, sourceNode, listener);
}
public long recoveryId() {
@ -314,11 +305,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
});
}
@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
ensureClusterStateVersionCallback.accept(clusterStateVersion);
}
@Override
public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
indexShard.activateWithPrimaryContext(primaryContext);

View File

@ -48,11 +48,6 @@ public interface RecoveryTargetHandler {
*/
void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener);
/**
* Blockingly waits for cluster state with at least clusterStateVersion to be available
*/
void ensureClusterStateVersion(long clusterStateVersion);
/**
* Handoff the primary context between the relocation source and the relocation target.
*

View File

@ -95,14 +95,6 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}
@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE,
new RecoveryWaitForClusterStateRequest(recoveryId, shardId, clusterStateVersion),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
transportService.submitRequest(

View File

@ -120,20 +120,19 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
};
thread.start();
IndexShard replica = shards.addReplica();
Future<Void> future = shards.asyncRecoverReplica(replica, (indexShard, node)
-> new RecoveryTarget(indexShard, node, recoveryListener, version -> {
}) {
@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
super.cleanFiles(totalTranslogOps, sourceMetaData);
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
Future<Void> future = shards.asyncRecoverReplica(replica,
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
super.cleanFiles(totalTranslogOps, sourceMetaData);
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}
});
});
future.get();
thread.join();
shards.assertAllEqual(numDocs);
@ -197,7 +196,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
thread.start();
IndexShard replica = shards.addReplica();
Future<Void> fut = shards.asyncRecoverReplica(replica,
(shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){
(shard, node) -> new RecoveryTarget(shard, node, recoveryListener) {
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps,
ActionListener<Void> listener) {

View File

@ -491,7 +491,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
AtomicBoolean recoveryDone = new AtomicBoolean(false);
final Future<Void> recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> {
recoveryStart.countDown();
return new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) {
return new RecoveryTarget(indexShard, node, recoveryListener) {
@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
recoveryDone.set(true);
@ -556,7 +556,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
final IndexShard replica = shards.addReplica();
final Future<Void> recoveryFuture = shards.asyncRecoverReplica(
replica,
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) {
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
@Override
public void indexTranslogOperations(
final List<Translog.Operation> operations,
@ -811,7 +811,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
public BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery,
IndexShard shard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener,
Logger logger) {
super(shard, sourceNode, listener, version -> {});
super(shard, sourceNode, listener);
this.recoveryBlocked = recoveryBlocked;
this.releaseRecovery = releaseRecovery;
this.stageToBlock = stageToBlock;

View File

@ -2427,8 +2427,7 @@ public class IndexShardTests extends IndexShardTestCase {
indexDoc(primary, "_doc", "0", "{\"foo\" : \"bar\"}");
IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null);
recoverReplica(replica, primary, (shard, discoveryNode) ->
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
new RecoveryTarget(shard, discoveryNode, recoveryListener) {
@Override
public void indexTranslogOperations(
final List<Translog.Operation> operations,
@ -2550,8 +2549,7 @@ public class IndexShardTests extends IndexShardTestCase {
// Shard is still inactive since we haven't started recovering yet
assertFalse(replica.isActive());
recoverReplica(replica, primary, (shard, discoveryNode) ->
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
new RecoveryTarget(shard, discoveryNode, recoveryListener) {
@Override
public void indexTranslogOperations(
final List<Translog.Operation> operations,
@ -2605,8 +2603,7 @@ public class IndexShardTests extends IndexShardTestCase {
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
assertListenerCalled.accept(replica);
recoverReplica(replica, primary, (shard, discoveryNode) ->
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
new RecoveryTarget(shard, discoveryNode, recoveryListener) {
// we're only checking that listeners are called when the engine is open, before there is no point
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {

View File

@ -61,7 +61,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
// Empty store
{
recoveryEmptyReplica(replica, true);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null);
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L));
recoveryTarget.decRef();
}
@ -77,7 +77,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
flushShard(replica);
replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test");
replica.sync();
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null);
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs));
recoveryTarget.decRef();
}
@ -91,7 +91,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
}
}
flushShard(replica);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null);
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs));
recoveryTarget.decRef();
}
@ -99,7 +99,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
{
replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test");
replica.sync();
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null);
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs + moreDocs));
recoveryTarget.decRef();
}
@ -118,7 +118,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null);
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
recoveryTarget.decRef();
}
@ -143,7 +143,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
final DiscoveryNode pNode = getFakeDiscoNode(sourceShard.routingEntry().currentNodeId());
final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId());
targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode));
final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null, null);
final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null);
recoveryTarget.receiveFileInfo(
mdFiles.stream().map(StoreFileMetaData::name).collect(Collectors.toList()),
mdFiles.stream().map(StoreFileMetaData::length).collect(Collectors.toList()),

View File

@ -695,10 +695,6 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
}
@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
}
@Override
public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) {
}

View File

@ -161,7 +161,6 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(nodeTo, random());
internalCluster().setDisruptionScheme(disruption);
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeTo);
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeTo);
CountDownLatch beginRelocationLatch = new CountDownLatch(1);
CountDownLatch receivedShardExistsRequestLatch = new CountDownLatch(1);
// use a tracer on the target node to track relocation start and end
@ -177,18 +176,6 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
// to the other nodes that should have a copy according to cluster state.
receivedShardExistsRequestLatch.countDown();
logger.info("received: {}, relocation done", action);
} else if (action.equals(PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE)) {
logger.info("received: {}, waiting on cluster state", action);
// ensure that relocation target node is on the same cluster state as relocation source before proceeding with
// this request. If the target does not have the relocating cluster state exposed through ClusterService.state(),
// then waitForClusterState will have to register a ClusterObserver with the ClusterService, which can cause
// a race with the BlockClusterStateProcessing block that is added below.
try {
assertBusy(() -> assertTrue(
clusterService.state().routingTable().index(index).shard(shard).primaryShard().relocating()));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
});

View File

@ -53,7 +53,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
public void testLastAccessTimeUpdate() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {});
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) {
final long lastSeenTime = status.target().lastAccessTime();
@ -70,7 +70,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
public void testRecoveryTimeout() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {});
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final AtomicBoolean failed = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica(),
@ -98,7 +98,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
public void testRecoveryCancellation() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {});
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) {
@ -117,7 +117,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
shards.startAll();
int numDocs = randomIntBetween(1, 15);
shards.indexDocs(numDocs);
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {});
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
IndexShard shard = shards.addReplica();
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shard);
RecoveryTarget recoveryTarget = collection.getRecoveryTarget(recoveryId);

View File

@ -410,7 +410,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
public void recoverReplica(IndexShard replica) throws IOException {
recoverReplica(replica, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> {}));
recoverReplica(replica, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener));
}
public void recoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier)

View File

@ -569,8 +569,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
/** recovers a replica from the given primary **/
protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException {
recoverReplica(replica, primary,
(r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> {
}),
(r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener),
true, startReplica);
}

View File

@ -45,11 +45,6 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler {
this.target = target;
}
@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
target.ensureClusterStateVersion(clusterStateVersion);
}
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
executor.execute(() -> target.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener));

View File

@ -358,7 +358,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
// We need to recover the replica async to release the main thread for the following task to fill missing
// operations between the local checkpoint and max_seq_no which the recovering replica is waiting for.
recoveryFuture = group.asyncRecoverReplica(newReplica,
(shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, recoveryListener, l -> {}) {});
(shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, recoveryListener) {});
}
}
if (recoveryFuture != null) {