From 05e1f55a8899b284e2cf766ded0e03cdad92e749 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 25 Jun 2019 22:38:55 -0400 Subject: [PATCH] Ensure relocation target still tracked when start handoff (#42201) If the master removes the relocating shard, but recovery isn't aware of it, then we can enter an invalid state where ReplicationTracker does not include the local shard. --- .../index/seqno/ReplicationTracker.java | 24 +++++--- .../elasticsearch/index/shard/IndexShard.java | 7 ++- .../recovery/RecoverySourceHandler.java | 2 +- .../index/seqno/ReplicationTrackerTests.java | 4 +- .../index/shard/IndexShardTests.java | 57 ++++++++++++++----- 5 files changed, 66 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index ae5155653ed..6f360577ba3 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -105,12 +105,13 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private volatile long operationPrimaryTerm; /** - * Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff} - * and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the - * handoff was successful or not. During the handoff, which has as main objective to transfer the internal state of the global - * checkpoint tracker from the relocation source to the target, the list of in-sync shard copies cannot grow, otherwise the relocation - * target might miss this information and increase the global checkpoint to eagerly. As consequence, some of the methods in this class - * are not allowed to be called while a handoff is in progress, in particular {@link #markAllocationIdAsInSync}. + * Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling + * {@link #startRelocationHandoff(String)} and is finished by either calling {@link #completeRelocationHandoff} or + * {@link #abortRelocationHandoff}, depending on whether the handoff was successful or not. During the handoff, which has as main + * objective to transfer the internal state of the global checkpoint tracker from the relocation source to the target, the list of + * in-sync shard copies cannot grow, otherwise the relocation target might miss this information and increase the global checkpoint + * to eagerly. As consequence, some of the methods in this class are not allowed to be called while a handoff is in progress, + * in particular {@link #markAllocationIdAsInSync}. * * A notable exception to this is the method {@link #updateFromMaster}, which is still allowed to be called during a relocation handoff. * The reason for this is that the handoff might fail and can be aborted (using {@link #abortRelocationHandoff}), in which case @@ -989,11 +990,15 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L /** * Initiates a relocation handoff and returns the corresponding primary context. */ - public synchronized PrimaryContext startRelocationHandoff() { + public synchronized PrimaryContext startRelocationHandoff(String targetAllocationId) { assert invariant(); assert primaryMode; assert handoffInProgress == false; assert pendingInSync.isEmpty() : "relocation handoff started while there are still shard copies pending in-sync: " + pendingInSync; + if (checkpoints.containsKey(targetAllocationId) == false) { + // can happen if the relocation target was removed from cluster but the recovery process isn't aware of that. + throw new IllegalStateException("relocation target [" + targetAllocationId + "] is no longer part of the replication group"); + } handoffInProgress = true; // copy clusterStateVersion and checkpoints and return // all the entries from checkpoints that are inSync: the reason we don't need to care about initializing non-insync entries @@ -1047,6 +1052,11 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L public synchronized void activateWithPrimaryContext(PrimaryContext primaryContext) { assert invariant(); assert primaryMode == false; + if (primaryContext.checkpoints.containsKey(shardAllocationId) == false) { + // can happen if the old primary was on an old version + assert indexSettings.getIndexVersionCreated().before(Version.V_7_3_0); + throw new IllegalStateException("primary context [" + primaryContext + "] does not contain " + shardAllocationId); + } final Runnable runAfter = getMasterUpdateOperationFromCurrentState(); primaryMode = true; // capture current state to possibly replay missed cluster state update diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4644a966516..7b4e06a451c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -619,10 +619,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * * @param consumer a {@link Runnable} that is executed after operations are blocked * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation + * @throws IllegalStateException if the relocation target is no longer part of the replication group * @throws InterruptedException if blocking operations is interrupted */ - public void relocated(final Consumer consumer) - throws IllegalIndexShardStateException, InterruptedException { + public void relocated(final String targetAllocationId, final Consumer consumer) + throws IllegalIndexShardStateException, IllegalStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; final Releasable forceRefreshes = refreshListeners.forceRefreshes(); try { @@ -636,7 +637,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. */ verifyRelocatingState(); - final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(); + final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId); try { consumer.accept(primaryContext); synchronized (mutex) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index ff5ea0b2930..f3e10c13c21 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -655,7 +655,7 @@ public class RecoverySourceHandler { logger.trace("performing relocation hand-off"); // TODO: make relocated async // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done - cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext)); + cancellableThreads.execute(() -> shard.relocated(request.targetAllocationId(), recoveryTarget::handoffPrimaryContext)); /* * if the recovery process fails after disabling primary mode on the source shard, both relocation source and * target are failed (see {@link IndexShard#updateRoutingEntry}). diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index 10e84e6ec53..4af8abf0e9c 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -716,7 +716,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase { newPrimary.shardAllocationId, Math.max(SequenceNumbers.NO_OPS_PERFORMED, oldPrimary.getGlobalCheckpoint() + randomInt(5))); oldPrimary.updateGlobalCheckpointForShard(newPrimary.shardAllocationId, oldPrimary.getGlobalCheckpoint()); - ReplicationTracker.PrimaryContext primaryContext = oldPrimary.startRelocationHandoff(); + ReplicationTracker.PrimaryContext primaryContext = oldPrimary.startRelocationHandoff(newPrimary.shardAllocationId); if (randomBoolean()) { // cluster state update after primary context handoff @@ -742,7 +742,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase { } // do another handoff - primaryContext = oldPrimary.startRelocationHandoff(); + primaryContext = oldPrimary.startRelocationHandoff(newPrimary.shardAllocationId); } // send primary context through the wire diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9bef8974563..1f08107d6dc 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -877,7 +877,7 @@ public class IndexShardTests extends IndexShardTestCase { routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); IndexShardTestCase.updateRoutingEntry(indexShard, routing); - indexShard.relocated(primaryContext -> {}); + indexShard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); engineClosed = false; break; } @@ -1746,12 +1746,13 @@ public class IndexShardTests extends IndexShardTestCase { public void testLockingBeforeAndAfterRelocated() throws Exception { final IndexShard shard = newStartedShard(true); - IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); + final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, routing); CountDownLatch latch = new CountDownLatch(1); Thread recoveryThread = new Thread(() -> { latch.countDown(); try { - shard.relocated(primaryContext -> {}); + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1776,13 +1777,14 @@ public class IndexShardTests extends IndexShardTestCase { public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { final IndexShard shard = newStartedShard(true); - IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); + final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, routing); final CountDownLatch startRecovery = new CountDownLatch(1); final CountDownLatch relocationStarted = new CountDownLatch(1); Thread recoveryThread = new Thread(() -> { try { startRecovery.await(); - shard.relocated(primaryContext -> relocationStarted.countDown()); + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> relocationStarted.countDown()); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1847,7 +1849,8 @@ public class IndexShardTests extends IndexShardTestCase { public void testStressRelocated() throws Exception { final IndexShard shard = newStartedShard(true); assertFalse(shard.isRelocatedPrimary()); - IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); + final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, routing); final int numThreads = randomIntBetween(2, 4); Thread[] indexThreads = new Thread[numThreads]; CountDownLatch allPrimaryOperationLocksAcquired = new CountDownLatch(numThreads); @@ -1869,7 +1872,7 @@ public class IndexShardTests extends IndexShardTestCase { AtomicBoolean relocated = new AtomicBoolean(); final Thread recoveryThread = new Thread(() -> { try { - shard.relocated(primaryContext -> {}); + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1902,8 +1905,9 @@ public class IndexShardTests extends IndexShardTestCase { public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedException { final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); - IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node")); - shard.relocated(primaryContext -> {}); + final ShardRouting routing = ShardRoutingHelper.relocate(originalRouting, "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, routing); + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting)); closeShards(shard); } @@ -1911,16 +1915,19 @@ public class IndexShardTests extends IndexShardTestCase { public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOException { final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); - IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node")); + final ShardRouting relocationRouting = ShardRoutingHelper.relocate(originalRouting, "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, relocationRouting); IndexShardTestCase.updateRoutingEntry(shard, originalRouting); - expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated(primaryContext -> {})); + expectThrows(IllegalIndexShardStateException.class, + () -> shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {})); closeShards(shard); } public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, InterruptedException, BrokenBarrierException { final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); - IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node")); + final ShardRouting relocationRouting = ShardRoutingHelper.relocate(originalRouting, "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, relocationRouting); CyclicBarrier cyclicBarrier = new CyclicBarrier(3); AtomicReference relocationException = new AtomicReference<>(); Thread relocationThread = new Thread(new AbstractRunnable() { @@ -1932,7 +1939,7 @@ public class IndexShardTests extends IndexShardTestCase { @Override protected void doRun() throws Exception { cyclicBarrier.await(); - shard.relocated(primaryContext -> {}); + shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); } }); relocationThread.start(); @@ -1960,7 +1967,8 @@ public class IndexShardTests extends IndexShardTestCase { assertThat(relocationException.get(), nullValue()); } else { logger.debug("shard relocation was cancelled"); - assertThat(relocationException.get(), instanceOf(IllegalIndexShardStateException.class)); + assertThat(relocationException.get(), + either(instanceOf(IllegalIndexShardStateException.class)).or(instanceOf(IllegalStateException.class))); assertThat("current routing:" + shard.routingEntry(), shard.routingEntry().relocating(), equalTo(false)); assertThat(cancellingException.get(), nullValue()); @@ -1968,6 +1976,25 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(shard); } + public void testRelocateMissingTarget() throws Exception { + final IndexShard shard = newStartedShard(true); + final ShardRouting original = shard.routingEntry(); + final ShardRouting toNode1 = ShardRoutingHelper.relocate(original, "node_1"); + IndexShardTestCase.updateRoutingEntry(shard, toNode1); + IndexShardTestCase.updateRoutingEntry(shard, original); + final ShardRouting toNode2 = ShardRoutingHelper.relocate(original, "node_2"); + IndexShardTestCase.updateRoutingEntry(shard, toNode2); + final AtomicBoolean relocated = new AtomicBoolean(); + final IllegalStateException error = expectThrows(IllegalStateException.class, + () -> shard.relocated(toNode1.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true))); + assertThat(error.getMessage(), equalTo("relocation target [" + toNode1.getTargetRelocatingShard().allocationId().getId() + + "] is no longer part of the replication group")); + assertFalse(relocated.get()); + shard.relocated(toNode2.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true)); + assertTrue(relocated.get()); + closeShards(shard); + } + public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { /* * The flow of this test: @@ -2273,7 +2300,7 @@ public class IndexShardTests extends IndexShardTestCase { assertThat(shard.state(), equalTo(IndexShardState.STARTED)); ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node"); IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting); - shard.relocated(primaryContext -> {}); + shard.relocated(inRecoveryRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); assertTrue(shard.isRelocatedPrimary()); try { IndexShardTestCase.updateRoutingEntry(shard, origRouting);