Ensure relocation target still tracked when start handoff (#42201)

If the master removes the relocating shard, but recovery isn't aware of
it, then we can enter an invalid state where ReplicationTracker does not
include the local shard.
This commit is contained in:
Nhat Nguyen 2019-06-25 22:38:55 -04:00
parent 58179af5af
commit 05e1f55a88
5 changed files with 66 additions and 28 deletions

View File

@ -105,12 +105,13 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private volatile long operationPrimaryTerm; private volatile long operationPrimaryTerm;
/** /**
* Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff} * Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling
* and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the * {@link #startRelocationHandoff(String)} and is finished by either calling {@link #completeRelocationHandoff} or
* handoff was successful or not. During the handoff, which has as main objective to transfer the internal state of the global * {@link #abortRelocationHandoff}, depending on whether the handoff was successful or not. During the handoff, which has as main
* checkpoint tracker from the relocation source to the target, the list of in-sync shard copies cannot grow, otherwise the relocation * objective to transfer the internal state of the global checkpoint tracker from the relocation source to the target, the list of
* target might miss this information and increase the global checkpoint to eagerly. As consequence, some of the methods in this class * in-sync shard copies cannot grow, otherwise the relocation target might miss this information and increase the global checkpoint
* are not allowed to be called while a handoff is in progress, in particular {@link #markAllocationIdAsInSync}. * to eagerly. As consequence, some of the methods in this class are not allowed to be called while a handoff is in progress,
* in particular {@link #markAllocationIdAsInSync}.
* *
* A notable exception to this is the method {@link #updateFromMaster}, which is still allowed to be called during a relocation handoff. * A notable exception to this is the method {@link #updateFromMaster}, which is still allowed to be called during a relocation handoff.
* The reason for this is that the handoff might fail and can be aborted (using {@link #abortRelocationHandoff}), in which case * The reason for this is that the handoff might fail and can be aborted (using {@link #abortRelocationHandoff}), in which case
@ -989,11 +990,15 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
/** /**
* Initiates a relocation handoff and returns the corresponding primary context. * Initiates a relocation handoff and returns the corresponding primary context.
*/ */
public synchronized PrimaryContext startRelocationHandoff() { public synchronized PrimaryContext startRelocationHandoff(String targetAllocationId) {
assert invariant(); assert invariant();
assert primaryMode; assert primaryMode;
assert handoffInProgress == false; assert handoffInProgress == false;
assert pendingInSync.isEmpty() : "relocation handoff started while there are still shard copies pending in-sync: " + pendingInSync; assert pendingInSync.isEmpty() : "relocation handoff started while there are still shard copies pending in-sync: " + pendingInSync;
if (checkpoints.containsKey(targetAllocationId) == false) {
// can happen if the relocation target was removed from cluster but the recovery process isn't aware of that.
throw new IllegalStateException("relocation target [" + targetAllocationId + "] is no longer part of the replication group");
}
handoffInProgress = true; handoffInProgress = true;
// copy clusterStateVersion and checkpoints and return // copy clusterStateVersion and checkpoints and return
// all the entries from checkpoints that are inSync: the reason we don't need to care about initializing non-insync entries // all the entries from checkpoints that are inSync: the reason we don't need to care about initializing non-insync entries
@ -1047,6 +1052,11 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
public synchronized void activateWithPrimaryContext(PrimaryContext primaryContext) { public synchronized void activateWithPrimaryContext(PrimaryContext primaryContext) {
assert invariant(); assert invariant();
assert primaryMode == false; assert primaryMode == false;
if (primaryContext.checkpoints.containsKey(shardAllocationId) == false) {
// can happen if the old primary was on an old version
assert indexSettings.getIndexVersionCreated().before(Version.V_7_3_0);
throw new IllegalStateException("primary context [" + primaryContext + "] does not contain " + shardAllocationId);
}
final Runnable runAfter = getMasterUpdateOperationFromCurrentState(); final Runnable runAfter = getMasterUpdateOperationFromCurrentState();
primaryMode = true; primaryMode = true;
// capture current state to possibly replay missed cluster state update // capture current state to possibly replay missed cluster state update

View File

@ -619,10 +619,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* *
* @param consumer a {@link Runnable} that is executed after operations are blocked * @param consumer a {@link Runnable} that is executed after operations are blocked
* @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation
* @throws IllegalStateException if the relocation target is no longer part of the replication group
* @throws InterruptedException if blocking operations is interrupted * @throws InterruptedException if blocking operations is interrupted
*/ */
public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer) public void relocated(final String targetAllocationId, final Consumer<ReplicationTracker.PrimaryContext> consumer)
throws IllegalIndexShardStateException, InterruptedException { throws IllegalIndexShardStateException, IllegalStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
final Releasable forceRefreshes = refreshListeners.forceRefreshes(); final Releasable forceRefreshes = refreshListeners.forceRefreshes();
try { try {
@ -636,7 +637,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
*/ */
verifyRelocatingState(); verifyRelocatingState();
final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(); final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId);
try { try {
consumer.accept(primaryContext); consumer.accept(primaryContext);
synchronized (mutex) { synchronized (mutex) {

View File

@ -655,7 +655,7 @@ public class RecoverySourceHandler {
logger.trace("performing relocation hand-off"); logger.trace("performing relocation hand-off");
// TODO: make relocated async // TODO: make relocated async
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext)); cancellableThreads.execute(() -> shard.relocated(request.targetAllocationId(), recoveryTarget::handoffPrimaryContext));
/* /*
* if the recovery process fails after disabling primary mode on the source shard, both relocation source and * if the recovery process fails after disabling primary mode on the source shard, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}). * target are failed (see {@link IndexShard#updateRoutingEntry}).

View File

@ -716,7 +716,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
newPrimary.shardAllocationId, newPrimary.shardAllocationId,
Math.max(SequenceNumbers.NO_OPS_PERFORMED, oldPrimary.getGlobalCheckpoint() + randomInt(5))); Math.max(SequenceNumbers.NO_OPS_PERFORMED, oldPrimary.getGlobalCheckpoint() + randomInt(5)));
oldPrimary.updateGlobalCheckpointForShard(newPrimary.shardAllocationId, oldPrimary.getGlobalCheckpoint()); oldPrimary.updateGlobalCheckpointForShard(newPrimary.shardAllocationId, oldPrimary.getGlobalCheckpoint());
ReplicationTracker.PrimaryContext primaryContext = oldPrimary.startRelocationHandoff(); ReplicationTracker.PrimaryContext primaryContext = oldPrimary.startRelocationHandoff(newPrimary.shardAllocationId);
if (randomBoolean()) { if (randomBoolean()) {
// cluster state update after primary context handoff // cluster state update after primary context handoff
@ -742,7 +742,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
} }
// do another handoff // do another handoff
primaryContext = oldPrimary.startRelocationHandoff(); primaryContext = oldPrimary.startRelocationHandoff(newPrimary.shardAllocationId);
} }
// send primary context through the wire // send primary context through the wire

View File

@ -877,7 +877,7 @@ public class IndexShardTests extends IndexShardTestCase {
routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
IndexShardTestCase.updateRoutingEntry(indexShard, routing); IndexShardTestCase.updateRoutingEntry(indexShard, routing);
indexShard.relocated(primaryContext -> {}); indexShard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {});
engineClosed = false; engineClosed = false;
break; break;
} }
@ -1746,12 +1746,13 @@ public class IndexShardTests extends IndexShardTestCase {
public void testLockingBeforeAndAfterRelocated() throws Exception { public void testLockingBeforeAndAfterRelocated() throws Exception {
final IndexShard shard = newStartedShard(true); final IndexShard shard = newStartedShard(true);
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node");
IndexShardTestCase.updateRoutingEntry(shard, routing);
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
Thread recoveryThread = new Thread(() -> { Thread recoveryThread = new Thread(() -> {
latch.countDown(); latch.countDown();
try { try {
shard.relocated(primaryContext -> {}); shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {});
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -1776,13 +1777,14 @@ public class IndexShardTests extends IndexShardTestCase {
public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception {
final IndexShard shard = newStartedShard(true); final IndexShard shard = newStartedShard(true);
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node");
IndexShardTestCase.updateRoutingEntry(shard, routing);
final CountDownLatch startRecovery = new CountDownLatch(1); final CountDownLatch startRecovery = new CountDownLatch(1);
final CountDownLatch relocationStarted = new CountDownLatch(1); final CountDownLatch relocationStarted = new CountDownLatch(1);
Thread recoveryThread = new Thread(() -> { Thread recoveryThread = new Thread(() -> {
try { try {
startRecovery.await(); startRecovery.await();
shard.relocated(primaryContext -> relocationStarted.countDown()); shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> relocationStarted.countDown());
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -1847,7 +1849,8 @@ public class IndexShardTests extends IndexShardTestCase {
public void testStressRelocated() throws Exception { public void testStressRelocated() throws Exception {
final IndexShard shard = newStartedShard(true); final IndexShard shard = newStartedShard(true);
assertFalse(shard.isRelocatedPrimary()); assertFalse(shard.isRelocatedPrimary());
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node");
IndexShardTestCase.updateRoutingEntry(shard, routing);
final int numThreads = randomIntBetween(2, 4); final int numThreads = randomIntBetween(2, 4);
Thread[] indexThreads = new Thread[numThreads]; Thread[] indexThreads = new Thread[numThreads];
CountDownLatch allPrimaryOperationLocksAcquired = new CountDownLatch(numThreads); CountDownLatch allPrimaryOperationLocksAcquired = new CountDownLatch(numThreads);
@ -1869,7 +1872,7 @@ public class IndexShardTests extends IndexShardTestCase {
AtomicBoolean relocated = new AtomicBoolean(); AtomicBoolean relocated = new AtomicBoolean();
final Thread recoveryThread = new Thread(() -> { final Thread recoveryThread = new Thread(() -> {
try { try {
shard.relocated(primaryContext -> {}); shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {});
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -1902,8 +1905,9 @@ public class IndexShardTests extends IndexShardTestCase {
public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedException { public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedException {
final IndexShard shard = newStartedShard(true); final IndexShard shard = newStartedShard(true);
final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting originalRouting = shard.routingEntry();
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node")); final ShardRouting routing = ShardRoutingHelper.relocate(originalRouting, "other_node");
shard.relocated(primaryContext -> {}); IndexShardTestCase.updateRoutingEntry(shard, routing);
shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {});
expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting)); expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting));
closeShards(shard); closeShards(shard);
} }
@ -1911,16 +1915,19 @@ public class IndexShardTests extends IndexShardTestCase {
public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOException { public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOException {
final IndexShard shard = newStartedShard(true); final IndexShard shard = newStartedShard(true);
final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting originalRouting = shard.routingEntry();
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node")); final ShardRouting relocationRouting = ShardRoutingHelper.relocate(originalRouting, "other_node");
IndexShardTestCase.updateRoutingEntry(shard, relocationRouting);
IndexShardTestCase.updateRoutingEntry(shard, originalRouting); IndexShardTestCase.updateRoutingEntry(shard, originalRouting);
expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated(primaryContext -> {})); expectThrows(IllegalIndexShardStateException.class,
() -> shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}));
closeShards(shard); closeShards(shard);
} }
public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, InterruptedException, BrokenBarrierException { public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, InterruptedException, BrokenBarrierException {
final IndexShard shard = newStartedShard(true); final IndexShard shard = newStartedShard(true);
final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting originalRouting = shard.routingEntry();
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node")); final ShardRouting relocationRouting = ShardRoutingHelper.relocate(originalRouting, "other_node");
IndexShardTestCase.updateRoutingEntry(shard, relocationRouting);
CyclicBarrier cyclicBarrier = new CyclicBarrier(3); CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
AtomicReference<Exception> relocationException = new AtomicReference<>(); AtomicReference<Exception> relocationException = new AtomicReference<>();
Thread relocationThread = new Thread(new AbstractRunnable() { Thread relocationThread = new Thread(new AbstractRunnable() {
@ -1932,7 +1939,7 @@ public class IndexShardTests extends IndexShardTestCase {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
cyclicBarrier.await(); cyclicBarrier.await();
shard.relocated(primaryContext -> {}); shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {});
} }
}); });
relocationThread.start(); relocationThread.start();
@ -1960,7 +1967,8 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(relocationException.get(), nullValue()); assertThat(relocationException.get(), nullValue());
} else { } else {
logger.debug("shard relocation was cancelled"); logger.debug("shard relocation was cancelled");
assertThat(relocationException.get(), instanceOf(IllegalIndexShardStateException.class)); assertThat(relocationException.get(),
either(instanceOf(IllegalIndexShardStateException.class)).or(instanceOf(IllegalStateException.class)));
assertThat("current routing:" + shard.routingEntry(), shard.routingEntry().relocating(), equalTo(false)); assertThat("current routing:" + shard.routingEntry(), shard.routingEntry().relocating(), equalTo(false));
assertThat(cancellingException.get(), nullValue()); assertThat(cancellingException.get(), nullValue());
@ -1968,6 +1976,25 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(shard); closeShards(shard);
} }
public void testRelocateMissingTarget() throws Exception {
final IndexShard shard = newStartedShard(true);
final ShardRouting original = shard.routingEntry();
final ShardRouting toNode1 = ShardRoutingHelper.relocate(original, "node_1");
IndexShardTestCase.updateRoutingEntry(shard, toNode1);
IndexShardTestCase.updateRoutingEntry(shard, original);
final ShardRouting toNode2 = ShardRoutingHelper.relocate(original, "node_2");
IndexShardTestCase.updateRoutingEntry(shard, toNode2);
final AtomicBoolean relocated = new AtomicBoolean();
final IllegalStateException error = expectThrows(IllegalStateException.class,
() -> shard.relocated(toNode1.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true)));
assertThat(error.getMessage(), equalTo("relocation target [" + toNode1.getTargetRelocatingShard().allocationId().getId()
+ "] is no longer part of the replication group"));
assertFalse(relocated.get());
shard.relocated(toNode2.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true));
assertTrue(relocated.get());
closeShards(shard);
}
public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
/* /*
* The flow of this test: * The flow of this test:
@ -2273,7 +2300,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(shard.state(), equalTo(IndexShardState.STARTED)); assertThat(shard.state(), equalTo(IndexShardState.STARTED));
ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node"); ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node");
IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting); IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting);
shard.relocated(primaryContext -> {}); shard.relocated(inRecoveryRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {});
assertTrue(shard.isRelocatedPrimary()); assertTrue(shard.isRelocatedPrimary());
try { try {
IndexShardTestCase.updateRoutingEntry(shard, origRouting); IndexShardTestCase.updateRoutingEntry(shard, origRouting);