From 5856c396ddd9e36094c5471d436be100bb586e60 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 18 Jul 2018 11:30:44 +0200 Subject: [PATCH] A replica can be promoted and started in one cluster state update (#32042) When a replica is fully recovered (i.e., in `POST_RECOVERY` state) we send a request to the master to start the shard. The master changes the state of the replica and publishes a cluster state to that effect. In certain cases, that cluster state can be processed on the node hosting the replica *together* with a cluster state that promotes that, now started, replica to a primary. This can happen due to cluster state batched processing or if the master died after having committed the cluster state that starts the shard but before publishing it to the node with the replica. If the master also held the primary shard, the new master node will remove the primary (as it failed) and will also immediately promote the replica (thinking it is started). Sadly our code in IndexShard didn't allow for this which caused [assertions](https://github.com/elastic/elasticsearch/blob/13917162ad5c59a96ccb4d6a81a5044546c45c22/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java#L482) to be tripped in some of our tests runs. --- .../elasticsearch/index/shard/IndexShard.java | 19 ++-- .../IndexLevelReplicationTests.java | 2 +- .../index/shard/IndexShardTests.java | 67 +++++--------- ...actIndicesClusterStateServiceTestCase.java | 8 ++ .../PeerRecoveryTargetServiceTests.java | 2 +- .../indices/recovery/RecoveryTests.java | 2 +- .../ESIndexLevelReplicationTestCase.java | 9 +- .../index/shard/IndexShardTestCase.java | 90 +++++++++++++++---- 8 files changed, 126 insertions(+), 73 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b07e22875e8..fc08438a7d9 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -413,10 +413,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl if (state == IndexShardState.POST_RECOVERY && newRouting.active()) { assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting; - - if (newRouting.primary() && currentRouting.isRelocationTarget() == false) { - replicationTracker.activatePrimaryMode(getLocalCheckpoint()); - } + assert currentRouting.isRelocationTarget() == false || currentRouting.primary() == false || + replicationTracker.isPrimaryMode() : + "a primary relocation is completed by the master, but primary mode is not active " + currentRouting; changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); } else if (currentRouting.primary() && currentRouting.relocating() && replicationTracker.isPrimaryMode() == false && @@ -432,7 +431,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl final CountDownLatch shardStateUpdated = new CountDownLatch(1); if (newRouting.primary()) { - if (newPrimaryTerm != primaryTerm) { + if (newPrimaryTerm == primaryTerm) { + if (currentRouting.initializing() && currentRouting.isRelocationTarget() == false && newRouting.active()) { + // the master started a recovering primary, activate primary mode. + replicationTracker.activatePrimaryMode(getLocalCheckpoint()); + } + } else { assert currentRouting.primary() == false : "term is only increased as part of primary promotion"; /* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned * in one state causing it's term to be incremented. Note that if both current shard state and new @@ -521,6 +525,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } // set this last, once we finished updating all internal state. this.shardRouting = newRouting; + + assert this.shardRouting.primary() == false || + this.shardRouting.started() == false || // note that we use started and not active to avoid relocating shards + this.replicationTracker.isPrimaryMode() + : "an started primary must be in primary mode " + this.shardRouting; shardStateUpdated.countDown(); } if (currentRouting != null && currentRouting.active() == false && newRouting.active()) { diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 018548be962..b05b1e5cc5c 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -363,7 +363,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase logger.info("--> Promote replica2 as the primary"); shards.promoteReplicaToPrimary(replica2); logger.info("--> Recover replica3 from replica2"); - recoverReplica(replica3, replica2); + recoverReplica(replica3, replica2, true); try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); assertThat(snapshot.next(), equalTo(op2)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 2e07ec950a5..15e6151457f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -226,6 +226,7 @@ public class IndexShardTests extends IndexShardTestCase { } public void testFailShard() throws Exception { + allowShardFailures(); IndexShard shard = newStartedShard(); final ShardPath shardPath = shard.shardPath(); assertNotNull(shardPath); @@ -309,7 +310,8 @@ public class IndexShardTests extends IndexShardTestCase { } public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException { - final IndexShard indexShard = newStartedShard(false); + final IndexShard indexShard = newShard(false); + recoveryEmptyReplica(indexShard, randomBoolean()); final int operations = scaledRandomIntBetween(1, 64); final CyclicBarrier barrier = new CyclicBarrier(1 + operations); @@ -353,20 +355,10 @@ public class IndexShardTests extends IndexShardTestCase { barrier.await(); latch.await(); - // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); - final ShardRouting primaryRouting = - newShardRouting( - replicaRouting.shardId(), - replicaRouting.currentNodeId(), - null, - true, - ShardRoutingState.STARTED, - replicaRouting.allocationId()); - indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, - 0L, Collections.singleton(primaryRouting.allocationId().getId()), - new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), - Collections.emptySet()); + promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()), + new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build()); + final int delayedOperations = scaledRandomIntBetween(1, 64); final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations); @@ -428,8 +420,9 @@ public class IndexShardTests extends IndexShardTestCase { * 1) Internal state (ala ReplicationTracker) have been updated * 2) Primary term is set to the new term */ - public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierException, InterruptedException { - final IndexShard indexShard = newStartedShard(false); + public void testPublishingOrderOnPromotion() throws IOException, InterruptedException, BrokenBarrierException { + final IndexShard indexShard = newShard(false); + recoveryEmptyReplica(indexShard, randomBoolean()); final long promotedTerm = indexShard.getPrimaryTerm() + 1; final CyclicBarrier barrier = new CyclicBarrier(2); final AtomicBoolean stop = new AtomicBoolean(); @@ -448,18 +441,10 @@ public class IndexShardTests extends IndexShardTestCase { }); thread.start(); - final ShardRouting replicaRouting = indexShard.routingEntry(); - final ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true, - ShardRoutingState.STARTED, replicaRouting.allocationId()); - - - final Set inSyncAllocationIds = Collections.singleton(primaryRouting.allocationId().getId()); - final IndexShardRoutingTable routingTable = - new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(); barrier.await(); - // promote the replica - indexShard.updateShardState(primaryRouting, promotedTerm, (shard, listener) -> {}, 0L, inSyncAllocationIds, routingTable, - Collections.emptySet()); + final ShardRouting replicaRouting = indexShard.routingEntry(); + promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()), + new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build()); stop.set(true); thread.join(); @@ -468,7 +453,8 @@ public class IndexShardTests extends IndexShardTestCase { public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { - final IndexShard indexShard = newStartedShard(false); + final IndexShard indexShard = newShard(false); + recoveryEmptyReplica(indexShard, randomBoolean()); // most of the time this is large enough that most of the time there will be at least one gap final int operations = 1024 - scaledRandomIntBetween(0, 1024); @@ -479,17 +465,8 @@ public class IndexShardTests extends IndexShardTestCase { // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); - final ShardRouting primaryRouting = - newShardRouting( - replicaRouting.shardId(), - replicaRouting.currentNodeId(), - null, - true, - ShardRoutingState.STARTED, - replicaRouting.allocationId()); - indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, - 0L, Collections.singleton(primaryRouting.allocationId().getId()), - new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet()); + promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()), + new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build()); /* * This operation completing means that the delay operation executed as part of increasing the primary term has completed and the @@ -506,7 +483,7 @@ public class IndexShardTests extends IndexShardTestCase { @Override public void onFailure(Exception e) { - throw new RuntimeException(e); + throw new AssertionError(e); } }, ThreadPool.Names.GENERIC, ""); @@ -846,7 +823,7 @@ public class IndexShardTests extends IndexShardTestCase { // add a replica recoverShardFromStore(primaryShard); final IndexShard replicaShard = newShard(shardId, false); - recoverReplica(replicaShard, primaryShard); + recoverReplica(replicaShard, primaryShard, true); final int maxSeqNo = randomIntBetween(0, 128); for (int i = 0; i <= maxSeqNo; i++) { EngineTestCase.generateNewSeqNo(primaryShard.getEngine()); @@ -1625,7 +1602,7 @@ public class IndexShardTests extends IndexShardTestCase { IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1)); final IndexShard primaryTarget = newShard(primarySource.routingEntry().getTargetRelocatingShard()); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetaData()); - recoverReplica(primaryTarget, primarySource); + recoverReplica(primaryTarget, primarySource, true); // check that local checkpoint of new primary is properly tracked after primary relocation assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L)); @@ -2082,7 +2059,7 @@ public class IndexShardTests extends IndexShardTestCase { assertFalse(replica.isSyncNeeded()); return localCheckpoint; } - }, true); + }, true, true); closeShards(primary, replica); } @@ -2189,7 +2166,7 @@ public class IndexShardTests extends IndexShardTestCase { assertTrue(replica.isActive()); return localCheckpoint; } - }, false); + }, false, true); closeShards(primary, replica); } @@ -2241,7 +2218,7 @@ public class IndexShardTests extends IndexShardTestCase { super.finalizeRecovery(globalCheckpoint); assertListenerCalled.accept(replica); } - }, false); + }, false, true); closeShards(primary, replica); } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 35bbc497838..5c6b000f7e5 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -357,6 +357,14 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC assertTrue("and active shard must stay active, current: " + this.shardRouting + ", got: " + shardRouting, shardRouting.active()); } + if (this.shardRouting.primary()) { + assertTrue("a primary shard can't be demoted", shardRouting.primary()); + } else if (shardRouting.primary()) { + // note: it's ok for a replica in post recovery to be started and promoted at once + // this can happen when the primary failed after we sent the start shard message + assertTrue("a replica can only be promoted when active. current: " + this.shardRouting + " new: " + shardRouting, + shardRouting.active()); + } this.shardRouting = shardRouting; if (shardRouting.primary()) { term = newPrimaryTerm; diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 3b50fa64915..4b1419375e6 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -43,7 +43,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { try { // Empty store { - recoveryEmptyReplica(replica); + recoveryEmptyReplica(replica, true); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L)); recoveryTarget.decRef(); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index e7606328c76..aaba17c3151 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -261,7 +261,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase { } IndexShard replicaShard = newShard(primaryShard.shardId(), false); updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData()); - recoverReplica(replicaShard, primaryShard); + recoverReplica(replicaShard, primaryShard, true); List commits = DirectoryReader.listCommits(replicaShard.store().directory()); long maxSeqNo = Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO)); assertThat(maxSeqNo, lessThanOrEqualTo(globalCheckpoint)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 9de88216822..5a5ee12065c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -265,7 +265,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase RecoverySource.PeerRecoverySource.INSTANCE); final IndexShard newReplica = - newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER); + newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER); replicas.add(newReplica); updateAllocationIDsOnPrimary(); return newReplica; @@ -341,8 +341,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase IndexShard replica, BiFunction targetSupplier, boolean markAsRecovering) throws IOException { - ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering, activeIds(), - routingTable(Function.identity())); + final IndexShardRoutingTable routingTable = routingTable(Function.identity()); + final Set inSyncIds = activeIds(); + ESIndexLevelReplicationTestCase.this.recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds, + routingTable); + ESIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); } public synchronized DiscoveryNode getPrimaryNode() { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index e4849be20e1..0cbc6e44502 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -92,8 +92,10 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; +import java.util.function.Consumer; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.hamcrest.Matchers.contains; @@ -108,6 +110,14 @@ public abstract class IndexShardTestCase extends ESTestCase { public static final IndexEventListener EMPTY_EVENT_LISTENER = new IndexEventListener() {}; + private static final AtomicBoolean failOnShardFailures = new AtomicBoolean(true); + + private static final Consumer DEFAULT_SHARD_FAILURE_HANDLER = failure -> { + if (failOnShardFailures.get()) { + throw new AssertionError(failure.reason, failure.cause); + } + }; + protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() { @Override public void onRecoveryDone(RecoveryState state) { @@ -128,6 +138,7 @@ public abstract class IndexShardTestCase extends ESTestCase { super.setUp(); threadPool = new TestThreadPool(getClass().getName(), threadPoolSettings()); primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards + failOnShardFailures.set(true); } @Override @@ -139,6 +150,15 @@ public abstract class IndexShardTestCase extends ESTestCase { } } + /** + * by default, tests will fail if any shard created by this class fails. Tests that cause failures by design + * can call this method to ignore those failures + * + */ + protected void allowShardFailures() { + failOnShardFailures.set(false); + } + public Settings threadPoolSettings() { return Settings.EMPTY; } @@ -270,7 +290,7 @@ public abstract class IndexShardTestCase extends ESTestCase { /** * creates a new initializing shard. - * @param routing shard routing to use + * @param routing shard routing to use * @param shardPath path to use for shard data * @param indexMetaData indexMetaData for the shard, including any mapping * @param indexSearcherWrapper an optional wrapper to be used during searchers @@ -302,6 +322,7 @@ public abstract class IndexShardTestCase extends ESTestCase { engineFactory, indexEventListener, indexSearcherWrapper, threadPool, BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer, breakerService); + indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; } finally { if (success == false) { @@ -358,7 +379,7 @@ public abstract class IndexShardTestCase extends ESTestCase { if (primary) { recoverShardFromStore(shard); } else { - recoveryEmptyReplica(shard); + recoveryEmptyReplica(shard, true); } return shard; } @@ -399,11 +420,11 @@ public abstract class IndexShardTestCase extends ESTestCase { inSyncIds, newRoutingTable, Collections.emptySet()); } - protected void recoveryEmptyReplica(IndexShard replica) throws IOException { + protected void recoveryEmptyReplica(IndexShard replica, boolean startReplica) throws IOException { IndexShard primary = null; try { primary = newStartedShard(true); - recoverReplica(replica, primary); + recoverReplica(replica, primary, startReplica); } finally { closeShards(primary); } @@ -415,42 +436,48 @@ public abstract class IndexShardTestCase extends ESTestCase { } /** recovers a replica from the given primary **/ - protected void recoverReplica(IndexShard replica, IndexShard primary) throws IOException { + protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { recoverReplica(replica, primary, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> { }), - true); + true, true); } /** recovers a replica from the given primary **/ protected void recoverReplica(final IndexShard replica, final IndexShard primary, final BiFunction targetSupplier, - final boolean markAsRecovering) throws IOException { + final boolean markAsRecovering, final boolean markAsStarted) throws IOException { IndexShardRoutingTable.Builder newRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId()); newRoutingTable.addShard(primary.routingEntry()); if (replica.routingEntry().isRelocationTarget() == false) { newRoutingTable.addShard(replica.routingEntry()); } - recoverReplica(replica, primary, targetSupplier, markAsRecovering, - Collections.singleton(primary.routingEntry().allocationId().getId()), - newRoutingTable.build()); + final Set inSyncIds = Collections.singleton(primary.routingEntry().allocationId().getId()); + final IndexShardRoutingTable routingTable = newRoutingTable.build(); + recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds, routingTable); + if (markAsStarted) { + startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); + } } /** * Recovers a replica from the give primary, allow the user to supply a custom recovery target. A typical usage of a custom recovery * target is to assert things in the various stages of recovery. + * + * Note: this method keeps the shard in {@link IndexShardState#POST_RECOVERY} and doesn't start it. + * * @param replica the recovery target shard * @param primary the recovery source shard * @param targetSupplier supplies an instance of {@link RecoveryTarget} * @param markAsRecovering set to {@code false} if the replica is marked as recovering */ - protected final void recoverReplica(final IndexShard replica, - final IndexShard primary, - final BiFunction targetSupplier, - final boolean markAsRecovering, - final Set inSyncIds, - final IndexShardRoutingTable routingTable) throws IOException { + protected final void recoverUnstartedReplica(final IndexShard replica, + final IndexShard primary, + final BiFunction targetSupplier, + final boolean markAsRecovering, + final Set inSyncIds, + final IndexShardRoutingTable routingTable) throws IOException { final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId()); final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId()); if (markAsRecovering) { @@ -478,11 +505,15 @@ public abstract class IndexShardTestCase extends ESTestCase { request, (int) ByteSizeUnit.MB.toBytes(1), Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), pNode.getName()).build()); - final ShardRouting initializingReplicaRouting = replica.routingEntry(); primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable, Collections.emptySet()); recovery.recoverToTarget(); recoveryTarget.markAsDone(); + } + + protected void startReplicaAfterRecovery(IndexShard replica, IndexShard primary, Set inSyncIds, + IndexShardRoutingTable routingTable) throws IOException { + ShardRouting initializingReplicaRouting = replica.routingEntry(); IndexShardRoutingTable newRoutingTable = initializingReplicaRouting.isRelocationTarget() ? new IndexShardRoutingTable.Builder(routingTable) @@ -502,6 +533,31 @@ public abstract class IndexShardTestCase extends ESTestCase { currentClusterStateVersion.get(), inSyncIdsWithReplica, newRoutingTable, Collections.emptySet()); } + + /** + * promotes a replica to primary, incrementing it's term and starting it if needed + */ + protected void promoteReplica(IndexShard replica, Set inSyncIds, IndexShardRoutingTable routingTable) throws IOException { + assertThat(inSyncIds, contains(replica.routingEntry().allocationId().getId())); + final ShardRouting routingEntry = newShardRouting( + replica.routingEntry().shardId(), + replica.routingEntry().currentNodeId(), + null, + true, + ShardRoutingState.STARTED, + replica.routingEntry().allocationId()); + + final IndexShardRoutingTable newRoutingTable = new IndexShardRoutingTable.Builder(routingTable) + .removeShard(replica.routingEntry()) + .addShard(routingEntry) + .build(); + replica.updateShardState(routingEntry, replica.getPrimaryTerm() + 1, + (is, listener) -> + listener.onResponse(new PrimaryReplicaSyncer.ResyncTask(1, "type", "action", "desc", null, Collections.emptyMap())), + currentClusterStateVersion.incrementAndGet(), + inSyncIds, newRoutingTable, Collections.emptySet()); + } + private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) throws IOException { Store.MetadataSnapshot result; try {