Fix concurrency issues between cancelling a relocation and marking shard as relocated (#20443)
Once a primary is marked as relocated, we can not safely move it back to started (as we have no way of waiting on inflight operations that are performed on the target primary). If the master cancels the relocation in that state, we fail the primary. Sadly, there is a racing condition between the `updateRoutingEntry` method (which is called when the relocation is cancelled by the master) and the `relocated` method. That racing condition can leave the shard as marked "relocated" but have the routing entry not reflect the target relocation. This in turns causes NPEs in TransportReplicationAction: ``` java.util.Objects requireNonNull Objects.java 203 org.elasticsearch.action.support.replication.TransportReplicationAction$ConcreteShardRequest <init> TransportReplicationAction.java 982 ``` Sadly, once we end up in this state, we will never recover. This commit fixes that race condition by making sure `updateRoutingEntry` acquires the mutex when checking for the relocated status. While at it, I also tightened up the code and added lots of assertions/hard checks.
This commit is contained in:
parent
1ae8d6123f
commit
10dcfa3304
|
@ -48,7 +48,6 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
|
|||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
@ -116,9 +115,9 @@ import org.elasticsearch.index.warmer.WarmerStats;
|
|||
import org.elasticsearch.indices.IndexingMemoryController;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
|
||||
import org.elasticsearch.indices.recovery.RecoveryFailedException;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.search.suggest.completion.CompletionFieldStats;
|
||||
|
@ -135,7 +134,6 @@ import java.nio.file.NoSuchFileException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
@ -368,60 +366,46 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
* @throws IOException if shard state could not be persisted
|
||||
*/
|
||||
public void updateRoutingEntry(final ShardRouting newRouting) throws IOException {
|
||||
final ShardRouting currentRouting = this.shardRouting;
|
||||
if (!newRouting.shardId().equals(shardId())) {
|
||||
throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId() + "");
|
||||
}
|
||||
if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
|
||||
throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
|
||||
}
|
||||
if (currentRouting != null) {
|
||||
if (!newRouting.primary() && currentRouting.primary()) {
|
||||
logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode");
|
||||
}
|
||||
// if its the same routing, return
|
||||
if (currentRouting.equals(newRouting)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
final ShardRouting currentRouting;
|
||||
synchronized (mutex) {
|
||||
currentRouting = this.shardRouting;
|
||||
|
||||
if (state == IndexShardState.POST_RECOVERY) {
|
||||
// if the state is started or relocating (cause it might move right away from started to relocating)
|
||||
// then move to STARTED
|
||||
if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) {
|
||||
if (!newRouting.shardId().equals(shardId())) {
|
||||
throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId() + "");
|
||||
}
|
||||
if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
|
||||
throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
|
||||
}
|
||||
if (currentRouting != null && currentRouting.primary() && newRouting.primary() == false) {
|
||||
throw new IllegalArgumentException("illegal state: trying to move shard from primary mode to replica mode. Current "
|
||||
+ currentRouting + ", new " + newRouting);
|
||||
}
|
||||
|
||||
if (state == IndexShardState.POST_RECOVERY && newRouting.active()) {
|
||||
assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;
|
||||
// we want to refresh *before* we move to internal STARTED state
|
||||
try {
|
||||
getEngine().refresh("cluster_state_started");
|
||||
} catch (Exception e) {
|
||||
logger.debug("failed to refresh due to move to cluster wide started", e);
|
||||
}
|
||||
|
||||
boolean movedToStarted = false;
|
||||
synchronized (mutex) {
|
||||
// do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY
|
||||
if (state == IndexShardState.POST_RECOVERY) {
|
||||
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
|
||||
movedToStarted = true;
|
||||
} else {
|
||||
logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state());
|
||||
}
|
||||
}
|
||||
if (movedToStarted) {
|
||||
indexEventListener.afterIndexShardStarted(this);
|
||||
}
|
||||
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
|
||||
} else if (state == IndexShardState.RELOCATED &&
|
||||
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
|
||||
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
|
||||
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
|
||||
// active primaries.
|
||||
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
|
||||
}
|
||||
this.shardRouting = newRouting;
|
||||
persistMetadata(newRouting, currentRouting);
|
||||
}
|
||||
|
||||
if (state == IndexShardState.RELOCATED &&
|
||||
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
|
||||
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
|
||||
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
|
||||
// active primaries.
|
||||
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
|
||||
if (currentRouting != null && currentRouting.active() == false && newRouting.active()) {
|
||||
indexEventListener.afterIndexShardStarted(this);
|
||||
}
|
||||
if (newRouting.equals(currentRouting) == false) {
|
||||
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
|
||||
}
|
||||
this.shardRouting = newRouting;
|
||||
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
|
||||
persistMetadata(newRouting, currentRouting);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -451,6 +435,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
|
||||
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
|
||||
try {
|
||||
indexShardOperationsLock.blockOperations(30, TimeUnit.MINUTES, () -> {
|
||||
// no shard operation locks are being held here, move state from started to relocated
|
||||
|
@ -460,6 +445,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
if (state != IndexShardState.STARTED) {
|
||||
throw new IndexShardNotStartedException(shardId, state);
|
||||
}
|
||||
// if the master cancelled the recovery, the target will be removed
|
||||
// and the recovery will stopped.
|
||||
// However, it is still possible that we concurrently end up here
|
||||
// and therefore have to protect we don't mark the shard as relocated when
|
||||
// its shard routing says otherwise.
|
||||
if (shardRouting.relocating() == false) {
|
||||
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
|
||||
": shard is no longer relocating " + shardRouting);
|
||||
}
|
||||
changeState(IndexShardState.RELOCATED, reason);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.elasticsearch.common.lease.Releasable;
|
|||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
|
@ -109,6 +110,7 @@ import java.util.concurrent.Semaphore;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
@ -121,12 +123,27 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
/**
|
||||
* Simple unit-test IndexShard related operations.
|
||||
*/
|
||||
public class IndexShardTests extends IndexShardTestCase {
|
||||
|
||||
public static ShardStateMetaData load(Logger logger, Path... shardPaths) throws IOException {
|
||||
return ShardStateMetaData.FORMAT.loadLatestState(logger, shardPaths);
|
||||
}
|
||||
|
||||
public static void write(ShardStateMetaData shardStateMetaData,
|
||||
Path... shardPaths) throws IOException {
|
||||
ShardStateMetaData.FORMAT.write(shardStateMetaData, shardPaths);
|
||||
}
|
||||
|
||||
public static Engine getEngineFromShard(IndexShard shard) {
|
||||
return shard.getEngineOrNull();
|
||||
}
|
||||
|
||||
public void testWriteShardState() throws Exception {
|
||||
try (NodeEnvironment env = newNodeEnvironment()) {
|
||||
ShardId id = new ShardId("foo", "fooUUID", 1);
|
||||
|
@ -323,10 +340,10 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
}
|
||||
case 2: {
|
||||
// relocation source
|
||||
indexShard = newStartedShard(false);
|
||||
indexShard = newStartedShard(true);
|
||||
ShardRouting routing = indexShard.routingEntry();
|
||||
routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
|
||||
false, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
|
||||
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
|
||||
indexShard.updateRoutingEntry(routing);
|
||||
indexShard.relocated("test");
|
||||
break;
|
||||
|
@ -371,15 +388,6 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(indexShard);
|
||||
}
|
||||
|
||||
public static ShardStateMetaData load(Logger logger, Path... shardPaths) throws IOException {
|
||||
return ShardStateMetaData.FORMAT.loadLatestState(logger, shardPaths);
|
||||
}
|
||||
|
||||
public static void write(ShardStateMetaData shardStateMetaData,
|
||||
Path... shardPaths) throws IOException {
|
||||
ShardStateMetaData.FORMAT.write(shardStateMetaData, shardPaths);
|
||||
}
|
||||
|
||||
public void testAcquireIndexCommit() throws IOException {
|
||||
final IndexShard shard = newStartedShard();
|
||||
int numDocs = randomInt(20);
|
||||
|
@ -443,7 +451,6 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(newShard);
|
||||
}
|
||||
|
||||
|
||||
public void testAsyncFsync() throws InterruptedException, IOException {
|
||||
IndexShard shard = newStartedShard();
|
||||
Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
|
||||
|
@ -500,7 +507,6 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(test);
|
||||
}
|
||||
|
||||
|
||||
public void testShardStats() throws IOException {
|
||||
IndexShard shard = newStartedShard();
|
||||
ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(),
|
||||
|
@ -662,6 +668,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
|
||||
public void testLockingBeforeAndAfterRelocated() throws Exception {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Thread recoveryThread = new Thread(() -> {
|
||||
latch.countDown();
|
||||
|
@ -692,6 +699,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
|
||||
public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
|
||||
Thread recoveryThread = new Thread(() -> {
|
||||
try {
|
||||
shard.relocated("simulated recovery");
|
||||
|
@ -725,6 +733,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
|
||||
public void testStressRelocated() throws Exception {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
|
||||
final int numThreads = randomIntBetween(2, 4);
|
||||
Thread[] indexThreads = new Thread[numThreads];
|
||||
CountDownLatch allPrimaryOperationLocksAcquired = new CountDownLatch(numThreads);
|
||||
|
@ -776,6 +785,75 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(shard);
|
||||
}
|
||||
|
||||
public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedException {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
final ShardRouting originalRouting = shard.routingEntry();
|
||||
shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
|
||||
shard.relocated("test");
|
||||
expectThrows(IllegalIndexShardStateException.class, () -> shard.updateRoutingEntry(originalRouting));
|
||||
closeShards(shard);
|
||||
}
|
||||
|
||||
public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOException, InterruptedException {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
final ShardRouting originalRouting = shard.routingEntry();
|
||||
shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
|
||||
shard.updateRoutingEntry(originalRouting);
|
||||
expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test"));
|
||||
closeShards(shard);
|
||||
}
|
||||
|
||||
public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, InterruptedException, BrokenBarrierException {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
final ShardRouting originalRouting = shard.routingEntry();
|
||||
shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
|
||||
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
|
||||
AtomicReference<Exception> relocationException = new AtomicReference<>();
|
||||
Thread relocationThread = new Thread(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
relocationException.set(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
cyclicBarrier.await();
|
||||
shard.relocated("test");
|
||||
}
|
||||
});
|
||||
relocationThread.start();
|
||||
AtomicReference<Exception> cancellingException = new AtomicReference<>();
|
||||
Thread cancellingThread = new Thread(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
cancellingException.set(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
cyclicBarrier.await();
|
||||
shard.updateRoutingEntry(originalRouting);
|
||||
}
|
||||
});
|
||||
cancellingThread.start();
|
||||
cyclicBarrier.await();
|
||||
relocationThread.join();
|
||||
cancellingThread.join();
|
||||
if (shard.state() == IndexShardState.RELOCATED) {
|
||||
logger.debug("shard was relocated successfully");
|
||||
assertThat(cancellingException.get(), instanceOf(IllegalIndexShardStateException.class));
|
||||
assertThat("current routing:" + shard.routingEntry(), shard.routingEntry().relocating(), equalTo(true));
|
||||
assertThat(relocationException.get(), nullValue());
|
||||
} else {
|
||||
logger.debug("shard relocation was cancelled");
|
||||
assertThat(relocationException.get(), instanceOf(IllegalIndexShardStateException.class));
|
||||
assertThat("current routing:" + shard.routingEntry(), shard.routingEntry().relocating(), equalTo(false));
|
||||
assertThat(cancellingException.get(), nullValue());
|
||||
|
||||
}
|
||||
closeShards(shard);
|
||||
}
|
||||
|
||||
public void testRecoverFromStore() throws IOException {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
int translogOps = 1;
|
||||
|
@ -1033,7 +1111,6 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(shard);
|
||||
}
|
||||
|
||||
|
||||
public void testIndexingOperationListenersIsInvokedOnRecovery() throws IOException {
|
||||
IndexShard shard = newStartedShard(true);
|
||||
indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}");
|
||||
|
@ -1086,7 +1163,6 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(newShard);
|
||||
}
|
||||
|
||||
|
||||
public void testSearchIsReleaseIfWrapperFails() throws IOException {
|
||||
IndexShard shard = newStartedShard(true);
|
||||
indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}");
|
||||
|
@ -1117,7 +1193,6 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(newShard);
|
||||
}
|
||||
|
||||
|
||||
public void testTranslogRecoverySyncsTranslog() throws IOException {
|
||||
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
|
@ -1362,8 +1437,4 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
public void verify(String verificationToken, DiscoveryNode localNode) {
|
||||
}
|
||||
}
|
||||
|
||||
public static Engine getEngineFromShard(IndexShard shard) {
|
||||
return shard.getEngineOrNull();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue