A replica can be promoted and started in one cluster state update (#32042)

When a replica is fully recovered (i.e., in `POST_RECOVERY` state) we send a request to the master
to start the shard. The master changes the state of the replica and publishes a cluster state to that
effect. In certain cases, that cluster state can be processed on the node hosting the replica
*together* with a cluster state that promotes that, now started, replica to a primary. This can
happen due to cluster state batched processing or if the master died after having committed the
cluster state that starts the shard but before publishing it to the node with the replica. If the master
also held the primary shard, the new master node will remove the primary (as it failed) and will also
immediately promote the replica (thinking it is started). 

Sadly our code in IndexShard didn't allow for this which caused [assertions](13917162ad/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java (L482)) to be tripped in some of our tests runs.
This commit is contained in:
Boaz Leskes 2018-07-18 11:30:44 +02:00 committed by GitHub
parent ef5e8d8d8a
commit 5856c396dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 126 additions and 73 deletions

View File

@ -413,10 +413,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (state == IndexShardState.POST_RECOVERY && newRouting.active()) { if (state == IndexShardState.POST_RECOVERY && newRouting.active()) {
assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting; assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;
assert currentRouting.isRelocationTarget() == false || currentRouting.primary() == false ||
if (newRouting.primary() && currentRouting.isRelocationTarget() == false) { replicationTracker.isPrimaryMode() :
replicationTracker.activatePrimaryMode(getLocalCheckpoint()); "a primary relocation is completed by the master, but primary mode is not active " + currentRouting;
}
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
} else if (currentRouting.primary() && currentRouting.relocating() && replicationTracker.isPrimaryMode() == false && } else if (currentRouting.primary() && currentRouting.relocating() && replicationTracker.isPrimaryMode() == false &&
@ -432,7 +431,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final CountDownLatch shardStateUpdated = new CountDownLatch(1); final CountDownLatch shardStateUpdated = new CountDownLatch(1);
if (newRouting.primary()) { if (newRouting.primary()) {
if (newPrimaryTerm != primaryTerm) { if (newPrimaryTerm == primaryTerm) {
if (currentRouting.initializing() && currentRouting.isRelocationTarget() == false && newRouting.active()) {
// the master started a recovering primary, activate primary mode.
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
}
} else {
assert currentRouting.primary() == false : "term is only increased as part of primary promotion"; assert currentRouting.primary() == false : "term is only increased as part of primary promotion";
/* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned /* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
* in one state causing it's term to be incremented. Note that if both current shard state and new * in one state causing it's term to be incremented. Note that if both current shard state and new
@ -521,6 +525,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} }
// set this last, once we finished updating all internal state. // set this last, once we finished updating all internal state.
this.shardRouting = newRouting; this.shardRouting = newRouting;
assert this.shardRouting.primary() == false ||
this.shardRouting.started() == false || // note that we use started and not active to avoid relocating shards
this.replicationTracker.isPrimaryMode()
: "an started primary must be in primary mode " + this.shardRouting;
shardStateUpdated.countDown(); shardStateUpdated.countDown();
} }
if (currentRouting != null && currentRouting.active() == false && newRouting.active()) { if (currentRouting != null && currentRouting.active() == false && newRouting.active()) {

View File

@ -363,7 +363,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
logger.info("--> Promote replica2 as the primary"); logger.info("--> Promote replica2 as the primary");
shards.promoteReplicaToPrimary(replica2); shards.promoteReplicaToPrimary(replica2);
logger.info("--> Recover replica3 from replica2"); logger.info("--> Recover replica3 from replica2");
recoverReplica(replica3, replica2); recoverReplica(replica3, replica2, true);
try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) { try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
assertThat(snapshot.next(), equalTo(op2)); assertThat(snapshot.next(), equalTo(op2));

View File

@ -226,6 +226,7 @@ public class IndexShardTests extends IndexShardTestCase {
} }
public void testFailShard() throws Exception { public void testFailShard() throws Exception {
allowShardFailures();
IndexShard shard = newStartedShard(); IndexShard shard = newStartedShard();
final ShardPath shardPath = shard.shardPath(); final ShardPath shardPath = shard.shardPath();
assertNotNull(shardPath); assertNotNull(shardPath);
@ -309,7 +310,8 @@ public class IndexShardTests extends IndexShardTestCase {
} }
public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException { public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException {
final IndexShard indexShard = newStartedShard(false); final IndexShard indexShard = newShard(false);
recoveryEmptyReplica(indexShard, randomBoolean());
final int operations = scaledRandomIntBetween(1, 64); final int operations = scaledRandomIntBetween(1, 64);
final CyclicBarrier barrier = new CyclicBarrier(1 + operations); final CyclicBarrier barrier = new CyclicBarrier(1 + operations);
@ -353,20 +355,10 @@ public class IndexShardTests extends IndexShardTestCase {
barrier.await(); barrier.await();
latch.await(); latch.await();
// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry(); final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting = promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()),
newShardRouting( new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build());
replicaRouting.shardId(),
replicaRouting.currentNodeId(),
null,
true,
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
0L, Collections.singleton(primaryRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(),
Collections.emptySet());
final int delayedOperations = scaledRandomIntBetween(1, 64); final int delayedOperations = scaledRandomIntBetween(1, 64);
final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations); final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations);
@ -428,8 +420,9 @@ public class IndexShardTests extends IndexShardTestCase {
* 1) Internal state (ala ReplicationTracker) have been updated * 1) Internal state (ala ReplicationTracker) have been updated
* 2) Primary term is set to the new term * 2) Primary term is set to the new term
*/ */
public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierException, InterruptedException { public void testPublishingOrderOnPromotion() throws IOException, InterruptedException, BrokenBarrierException {
final IndexShard indexShard = newStartedShard(false); final IndexShard indexShard = newShard(false);
recoveryEmptyReplica(indexShard, randomBoolean());
final long promotedTerm = indexShard.getPrimaryTerm() + 1; final long promotedTerm = indexShard.getPrimaryTerm() + 1;
final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean stop = new AtomicBoolean(); final AtomicBoolean stop = new AtomicBoolean();
@ -448,18 +441,10 @@ public class IndexShardTests extends IndexShardTestCase {
}); });
thread.start(); thread.start();
final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true,
ShardRoutingState.STARTED, replicaRouting.allocationId());
final Set<String> inSyncAllocationIds = Collections.singleton(primaryRouting.allocationId().getId());
final IndexShardRoutingTable routingTable =
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build();
barrier.await(); barrier.await();
// promote the replica final ShardRouting replicaRouting = indexShard.routingEntry();
indexShard.updateShardState(primaryRouting, promotedTerm, (shard, listener) -> {}, 0L, inSyncAllocationIds, routingTable, promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()),
Collections.emptySet()); new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build());
stop.set(true); stop.set(true);
thread.join(); thread.join();
@ -468,7 +453,8 @@ public class IndexShardTests extends IndexShardTestCase {
public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {
final IndexShard indexShard = newStartedShard(false); final IndexShard indexShard = newShard(false);
recoveryEmptyReplica(indexShard, randomBoolean());
// most of the time this is large enough that most of the time there will be at least one gap // most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024); final int operations = 1024 - scaledRandomIntBetween(0, 1024);
@ -479,17 +465,8 @@ public class IndexShardTests extends IndexShardTestCase {
// promote the replica // promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry(); final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting = promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()),
newShardRouting( new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build());
replicaRouting.shardId(),
replicaRouting.currentNodeId(),
null,
true,
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
0L, Collections.singleton(primaryRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet());
/* /*
* This operation completing means that the delay operation executed as part of increasing the primary term has completed and the * This operation completing means that the delay operation executed as part of increasing the primary term has completed and the
@ -506,7 +483,7 @@ public class IndexShardTests extends IndexShardTestCase {
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
throw new RuntimeException(e); throw new AssertionError(e);
} }
}, },
ThreadPool.Names.GENERIC, ""); ThreadPool.Names.GENERIC, "");
@ -846,7 +823,7 @@ public class IndexShardTests extends IndexShardTestCase {
// add a replica // add a replica
recoverShardFromStore(primaryShard); recoverShardFromStore(primaryShard);
final IndexShard replicaShard = newShard(shardId, false); final IndexShard replicaShard = newShard(shardId, false);
recoverReplica(replicaShard, primaryShard); recoverReplica(replicaShard, primaryShard, true);
final int maxSeqNo = randomIntBetween(0, 128); final int maxSeqNo = randomIntBetween(0, 128);
for (int i = 0; i <= maxSeqNo; i++) { for (int i = 0; i <= maxSeqNo; i++) {
EngineTestCase.generateNewSeqNo(primaryShard.getEngine()); EngineTestCase.generateNewSeqNo(primaryShard.getEngine());
@ -1625,7 +1602,7 @@ public class IndexShardTests extends IndexShardTestCase {
IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1)); IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1));
final IndexShard primaryTarget = newShard(primarySource.routingEntry().getTargetRelocatingShard()); final IndexShard primaryTarget = newShard(primarySource.routingEntry().getTargetRelocatingShard());
updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetaData()); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetaData());
recoverReplica(primaryTarget, primarySource); recoverReplica(primaryTarget, primarySource, true);
// check that local checkpoint of new primary is properly tracked after primary relocation // check that local checkpoint of new primary is properly tracked after primary relocation
assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L)); assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L));
@ -2082,7 +2059,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertFalse(replica.isSyncNeeded()); assertFalse(replica.isSyncNeeded());
return localCheckpoint; return localCheckpoint;
} }
}, true); }, true, true);
closeShards(primary, replica); closeShards(primary, replica);
} }
@ -2189,7 +2166,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertTrue(replica.isActive()); assertTrue(replica.isActive());
return localCheckpoint; return localCheckpoint;
} }
}, false); }, false, true);
closeShards(primary, replica); closeShards(primary, replica);
} }
@ -2241,7 +2218,7 @@ public class IndexShardTests extends IndexShardTestCase {
super.finalizeRecovery(globalCheckpoint); super.finalizeRecovery(globalCheckpoint);
assertListenerCalled.accept(replica); assertListenerCalled.accept(replica);
} }
}, false); }, false, true);
closeShards(primary, replica); closeShards(primary, replica);
} }

View File

@ -357,6 +357,14 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
assertTrue("and active shard must stay active, current: " + this.shardRouting + ", got: " + shardRouting, assertTrue("and active shard must stay active, current: " + this.shardRouting + ", got: " + shardRouting,
shardRouting.active()); shardRouting.active());
} }
if (this.shardRouting.primary()) {
assertTrue("a primary shard can't be demoted", shardRouting.primary());
} else if (shardRouting.primary()) {
// note: it's ok for a replica in post recovery to be started and promoted at once
// this can happen when the primary failed after we sent the start shard message
assertTrue("a replica can only be promoted when active. current: " + this.shardRouting + " new: " + shardRouting,
shardRouting.active());
}
this.shardRouting = shardRouting; this.shardRouting = shardRouting;
if (shardRouting.primary()) { if (shardRouting.primary()) {
term = newPrimaryTerm; term = newPrimaryTerm;

View File

@ -43,7 +43,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
try { try {
// Empty store // Empty store
{ {
recoveryEmptyReplica(replica); recoveryEmptyReplica(replica, true);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L)); assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L));
recoveryTarget.decRef(); recoveryTarget.decRef();

View File

@ -261,7 +261,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
} }
IndexShard replicaShard = newShard(primaryShard.shardId(), false); IndexShard replicaShard = newShard(primaryShard.shardId(), false);
updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData()); updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData());
recoverReplica(replicaShard, primaryShard); recoverReplica(replicaShard, primaryShard, true);
List<IndexCommit> commits = DirectoryReader.listCommits(replicaShard.store().directory()); List<IndexCommit> commits = DirectoryReader.listCommits(replicaShard.store().directory());
long maxSeqNo = Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO)); long maxSeqNo = Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
assertThat(maxSeqNo, lessThanOrEqualTo(globalCheckpoint)); assertThat(maxSeqNo, lessThanOrEqualTo(globalCheckpoint));

View File

@ -265,7 +265,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
RecoverySource.PeerRecoverySource.INSTANCE); RecoverySource.PeerRecoverySource.INSTANCE);
final IndexShard newReplica = final IndexShard newReplica =
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER); newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER);
replicas.add(newReplica); replicas.add(newReplica);
updateAllocationIDsOnPrimary(); updateAllocationIDsOnPrimary();
return newReplica; return newReplica;
@ -341,8 +341,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
IndexShard replica, IndexShard replica,
BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
boolean markAsRecovering) throws IOException { boolean markAsRecovering) throws IOException {
ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering, activeIds(), final IndexShardRoutingTable routingTable = routingTable(Function.identity());
routingTable(Function.identity())); final Set<String> inSyncIds = activeIds();
ESIndexLevelReplicationTestCase.this.recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds,
routingTable);
ESIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable);
} }
public synchronized DiscoveryNode getPrimaryNode() { public synchronized DiscoveryNode getPrimaryNode() {

View File

@ -92,8 +92,10 @@ import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
@ -108,6 +110,14 @@ public abstract class IndexShardTestCase extends ESTestCase {
public static final IndexEventListener EMPTY_EVENT_LISTENER = new IndexEventListener() {}; public static final IndexEventListener EMPTY_EVENT_LISTENER = new IndexEventListener() {};
private static final AtomicBoolean failOnShardFailures = new AtomicBoolean(true);
private static final Consumer<IndexShard.ShardFailure> DEFAULT_SHARD_FAILURE_HANDLER = failure -> {
if (failOnShardFailures.get()) {
throw new AssertionError(failure.reason, failure.cause);
}
};
protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() { protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() {
@Override @Override
public void onRecoveryDone(RecoveryState state) { public void onRecoveryDone(RecoveryState state) {
@ -128,6 +138,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
super.setUp(); super.setUp();
threadPool = new TestThreadPool(getClass().getName(), threadPoolSettings()); threadPool = new TestThreadPool(getClass().getName(), threadPoolSettings());
primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards
failOnShardFailures.set(true);
} }
@Override @Override
@ -139,6 +150,15 @@ public abstract class IndexShardTestCase extends ESTestCase {
} }
} }
/**
* by default, tests will fail if any shard created by this class fails. Tests that cause failures by design
* can call this method to ignore those failures
*
*/
protected void allowShardFailures() {
failOnShardFailures.set(false);
}
public Settings threadPoolSettings() { public Settings threadPoolSettings() {
return Settings.EMPTY; return Settings.EMPTY;
} }
@ -270,7 +290,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
/** /**
* creates a new initializing shard. * creates a new initializing shard.
* @param routing shard routing to use * @param routing shard routing to use
* @param shardPath path to use for shard data * @param shardPath path to use for shard data
* @param indexMetaData indexMetaData for the shard, including any mapping * @param indexMetaData indexMetaData for the shard, including any mapping
* @param indexSearcherWrapper an optional wrapper to be used during searchers * @param indexSearcherWrapper an optional wrapper to be used during searchers
@ -302,6 +322,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
engineFactory, indexEventListener, indexSearcherWrapper, threadPool, engineFactory, indexEventListener, indexSearcherWrapper, threadPool,
BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer, BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer,
breakerService); breakerService);
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
success = true; success = true;
} finally { } finally {
if (success == false) { if (success == false) {
@ -358,7 +379,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
if (primary) { if (primary) {
recoverShardFromStore(shard); recoverShardFromStore(shard);
} else { } else {
recoveryEmptyReplica(shard); recoveryEmptyReplica(shard, true);
} }
return shard; return shard;
} }
@ -399,11 +420,11 @@ public abstract class IndexShardTestCase extends ESTestCase {
inSyncIds, newRoutingTable, Collections.emptySet()); inSyncIds, newRoutingTable, Collections.emptySet());
} }
protected void recoveryEmptyReplica(IndexShard replica) throws IOException { protected void recoveryEmptyReplica(IndexShard replica, boolean startReplica) throws IOException {
IndexShard primary = null; IndexShard primary = null;
try { try {
primary = newStartedShard(true); primary = newStartedShard(true);
recoverReplica(replica, primary); recoverReplica(replica, primary, startReplica);
} finally { } finally {
closeShards(primary); closeShards(primary);
} }
@ -415,42 +436,48 @@ public abstract class IndexShardTestCase extends ESTestCase {
} }
/** recovers a replica from the given primary **/ /** recovers a replica from the given primary **/
protected void recoverReplica(IndexShard replica, IndexShard primary) throws IOException { protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException {
recoverReplica(replica, primary, recoverReplica(replica, primary,
(r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> { (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> {
}), }),
true); true, true);
} }
/** recovers a replica from the given primary **/ /** recovers a replica from the given primary **/
protected void recoverReplica(final IndexShard replica, protected void recoverReplica(final IndexShard replica,
final IndexShard primary, final IndexShard primary,
final BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier, final BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
final boolean markAsRecovering) throws IOException { final boolean markAsRecovering, final boolean markAsStarted) throws IOException {
IndexShardRoutingTable.Builder newRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId()); IndexShardRoutingTable.Builder newRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId());
newRoutingTable.addShard(primary.routingEntry()); newRoutingTable.addShard(primary.routingEntry());
if (replica.routingEntry().isRelocationTarget() == false) { if (replica.routingEntry().isRelocationTarget() == false) {
newRoutingTable.addShard(replica.routingEntry()); newRoutingTable.addShard(replica.routingEntry());
} }
recoverReplica(replica, primary, targetSupplier, markAsRecovering, final Set<String> inSyncIds = Collections.singleton(primary.routingEntry().allocationId().getId());
Collections.singleton(primary.routingEntry().allocationId().getId()), final IndexShardRoutingTable routingTable = newRoutingTable.build();
newRoutingTable.build()); recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds, routingTable);
if (markAsStarted) {
startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable);
}
} }
/** /**
* Recovers a replica from the give primary, allow the user to supply a custom recovery target. A typical usage of a custom recovery * Recovers a replica from the give primary, allow the user to supply a custom recovery target. A typical usage of a custom recovery
* target is to assert things in the various stages of recovery. * target is to assert things in the various stages of recovery.
*
* Note: this method keeps the shard in {@link IndexShardState#POST_RECOVERY} and doesn't start it.
*
* @param replica the recovery target shard * @param replica the recovery target shard
* @param primary the recovery source shard * @param primary the recovery source shard
* @param targetSupplier supplies an instance of {@link RecoveryTarget} * @param targetSupplier supplies an instance of {@link RecoveryTarget}
* @param markAsRecovering set to {@code false} if the replica is marked as recovering * @param markAsRecovering set to {@code false} if the replica is marked as recovering
*/ */
protected final void recoverReplica(final IndexShard replica, protected final void recoverUnstartedReplica(final IndexShard replica,
final IndexShard primary, final IndexShard primary,
final BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier, final BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
final boolean markAsRecovering, final boolean markAsRecovering,
final Set<String> inSyncIds, final Set<String> inSyncIds,
final IndexShardRoutingTable routingTable) throws IOException { final IndexShardRoutingTable routingTable) throws IOException {
final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId()); final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId());
final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId()); final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId());
if (markAsRecovering) { if (markAsRecovering) {
@ -478,11 +505,15 @@ public abstract class IndexShardTestCase extends ESTestCase {
request, request,
(int) ByteSizeUnit.MB.toBytes(1), (int) ByteSizeUnit.MB.toBytes(1),
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), pNode.getName()).build()); Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), pNode.getName()).build());
final ShardRouting initializingReplicaRouting = replica.routingEntry();
primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(),
inSyncIds, routingTable, Collections.emptySet()); inSyncIds, routingTable, Collections.emptySet());
recovery.recoverToTarget(); recovery.recoverToTarget();
recoveryTarget.markAsDone(); recoveryTarget.markAsDone();
}
protected void startReplicaAfterRecovery(IndexShard replica, IndexShard primary, Set<String> inSyncIds,
IndexShardRoutingTable routingTable) throws IOException {
ShardRouting initializingReplicaRouting = replica.routingEntry();
IndexShardRoutingTable newRoutingTable = IndexShardRoutingTable newRoutingTable =
initializingReplicaRouting.isRelocationTarget() ? initializingReplicaRouting.isRelocationTarget() ?
new IndexShardRoutingTable.Builder(routingTable) new IndexShardRoutingTable.Builder(routingTable)
@ -502,6 +533,31 @@ public abstract class IndexShardTestCase extends ESTestCase {
currentClusterStateVersion.get(), inSyncIdsWithReplica, newRoutingTable, Collections.emptySet()); currentClusterStateVersion.get(), inSyncIdsWithReplica, newRoutingTable, Collections.emptySet());
} }
/**
* promotes a replica to primary, incrementing it's term and starting it if needed
*/
protected void promoteReplica(IndexShard replica, Set<String> inSyncIds, IndexShardRoutingTable routingTable) throws IOException {
assertThat(inSyncIds, contains(replica.routingEntry().allocationId().getId()));
final ShardRouting routingEntry = newShardRouting(
replica.routingEntry().shardId(),
replica.routingEntry().currentNodeId(),
null,
true,
ShardRoutingState.STARTED,
replica.routingEntry().allocationId());
final IndexShardRoutingTable newRoutingTable = new IndexShardRoutingTable.Builder(routingTable)
.removeShard(replica.routingEntry())
.addShard(routingEntry)
.build();
replica.updateShardState(routingEntry, replica.getPrimaryTerm() + 1,
(is, listener) ->
listener.onResponse(new PrimaryReplicaSyncer.ResyncTask(1, "type", "action", "desc", null, Collections.emptyMap())),
currentClusterStateVersion.incrementAndGet(),
inSyncIds, newRoutingTable, Collections.emptySet());
}
private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) throws IOException { private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) throws IOException {
Store.MetadataSnapshot result; Store.MetadataSnapshot result;
try { try {