From 01ca81e2a37c9cd38c6dc44ae567d427c059dbe0 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Fri, 11 Jul 2014 09:11:51 +0200 Subject: [PATCH] Improve handling of failed primary replica handling Out of #6808, we improved the handling of a primary failing to make sure replicas that are initializing are properly failed as well. After double checking it, it has 2 problems, the first, if the same shard routing is failed again, there is no protection that we don't apply the failure (which we do in failed shard cases), and the other was that we already tried to handle it (wrongly) in the elect primary method. This change fixes the handling to work correctly in the elect primary method, and adds unit tests to verify the behavior The change also expose a problem in our handling of replica shards that stay initializing during primary failure and electing another replica shard as primary, where we need to cancel its ongoing recovery to make sure it re-starts from the new elected primary closes #6825 --- .../cluster/routing/RoutingNodes.java | 16 ++++ .../routing/allocation/AllocationService.java | 65 +++++-------- .../cluster/IndicesClusterStateService.java | 80 +++++++++++----- .../indices/recovery/RecoveryStatus.java | 9 +- .../indices/recovery/RecoveryTarget.java | 2 +- .../allocation/FailedShardsRoutingTests.java | 94 +++++++++++++++++++ 6 files changed, 195 insertions(+), 71 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 87049555a8f..c22a05d2f10 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -310,6 +310,12 @@ public class RoutingNodes implements Iterable { for (RoutingNode routingNode : this) { shards.addAll(routingNode.shardsWithState(state)); } + for (ShardRoutingState s : state) { + if (s == ShardRoutingState.UNASSIGNED) { + Iterables.addAll(shards, unassigned()); + break; + } + } return shards; } @@ -319,6 +325,16 @@ public class RoutingNodes implements Iterable { for (RoutingNode routingNode : this) { shards.addAll(routingNode.shardsWithState(index, state)); } + for (ShardRoutingState s : state) { + if (s == ShardRoutingState.UNASSIGNED) { + for (MutableShardRouting unassignedShard : unassignedShards) { + if (unassignedShard.index().equals(index)) { + shards.add(unassignedShard); + } + } + break; + } + } return shards; } diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index b8431fac624..603ee7b8570 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -192,7 +192,7 @@ public class AllocationService extends AbstractComponent { // elect primaries *before* allocating unassigned, so backups of primaries that failed // will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*) - changed |= electPrimariesAndUnassignDanglingReplicas(allocation); + changed |= electPrimariesAndUnassignedDanglingReplicas(allocation); if (!changed) { return new RoutingAllocation.Result(false, clusterState.routingTable()); @@ -210,13 +210,13 @@ public class AllocationService extends AbstractComponent { // elect primaries *before* allocating unassigned, so backups of primaries that failed // will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*) - changed |= electPrimariesAndUnassignDanglingReplicas(allocation); + changed |= electPrimariesAndUnassignedDanglingReplicas(allocation); // now allocate all the unassigned to available nodes if (allocation.routingNodes().hasUnassigned()) { changed |= shardsAllocators.allocateUnassigned(allocation); // elect primaries again, in case this is needed with unassigned allocation - changed |= electPrimariesAndUnassignDanglingReplicas(allocation); + changed |= electPrimariesAndUnassignedDanglingReplicas(allocation); } // move shards that no longer can be allocated @@ -269,13 +269,31 @@ public class AllocationService extends AbstractComponent { return changed; } - private boolean electPrimariesAndUnassignDanglingReplicas(RoutingAllocation allocation) { + private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) { boolean changed = false; RoutingNodes routingNodes = allocation.routingNodes(); if (!routingNodes.hasUnassignedPrimaries()) { // move out if we don't have unassigned primaries return changed; } + + // go over and remove dangling replicas that are initializing for primary shards + List shardsToFail = Lists.newArrayList(); + for (MutableShardRouting shardEntry : routingNodes.unassigned()) { + if (shardEntry.primary()) { + for (MutableShardRouting routing : routingNodes.assignedShards(shardEntry)) { + if (!routing.primary() && routing.initializing()) { + shardsToFail.add(routing); + } + } + } + } + for (ShardRouting shardToFail : shardsToFail) { + changed |= applyFailedShard(allocation, shardToFail, false); + } + + // now, go over and elect a new primary if possible, not, from this code block on, if one is elected, + // routingNodes.hasUnassignedPrimaries() will potentially be false for (MutableShardRouting shardEntry : routingNodes.unassigned()) { if (shardEntry.primary()) { MutableShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry); @@ -298,28 +316,6 @@ public class AllocationService extends AbstractComponent { } } - // go over and remove dangling replicas that are initializing, but we couldn't elect primary ones... - List shardsToFail = null; - if (routingNodes.hasUnassignedPrimaries()) { - for (MutableShardRouting shardEntry : routingNodes.unassigned()) { - if (shardEntry.primary()) { - for (MutableShardRouting routing : routingNodes.assignedShards(shardEntry)) { - if (!routing.primary()) { - changed = true; - if (shardsToFail == null) { - shardsToFail = new ArrayList<>(); - } - shardsToFail.add(routing); - } - } - } - } - if (shardsToFail != null) { - for (ShardRouting shardToFail : shardsToFail) { - applyFailedShard(allocation, shardToFail, false); - } - } - } return changed; } @@ -421,23 +417,6 @@ public class AllocationService extends AbstractComponent { RoutingNodes routingNodes = allocation.routingNodes(); boolean dirty = false; - if (failedShard.primary()) { - // we have to fail the initializing replicas if the primary fails - // since they might now yet have started the recovery and then they will - // stick in the cluster-state forever since the replica has a retry logic that - // retries infinitely in that case. - List initializingReplicas = new ArrayList<>(); - for (MutableShardRouting shard : routingNodes.assignedShards(failedShard)){ - if (!shard.primary() && shard.initializing()) { - initializingReplicas.add(shard); - } - } - // we can't do this in the loop above since we modify the iterator and will get - // concurrent modification exceptions - for (MutableShardRouting shard : initializingReplicas) { - dirty |= applyFailedShard(allocation, shard, addToIgnoreList); - } - } if (failedShard.relocatingNodeId() != null) { // the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node) if (failedShard.state() == INITIALIZING) { diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 3b3b0b9b51a..f7f4816ad42 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -24,6 +24,7 @@ import com.carrotsearch.hppc.ObjectContainer; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.google.common.collect.Lists; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; @@ -541,6 +542,18 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent openIndexOutputs = ConcurrentCollections.newConcurrentMap(); public final Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums(); + public DiscoveryNode sourceNode() { + return this.sourceNode; + } + public RecoveryState recoveryState() { return recoveryState; } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 94d1b276195..eaecbd9e159 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -163,7 +163,7 @@ public class RecoveryTarget extends AbstractComponent { return; } // create a new recovery status, and process... - final RecoveryStatus recoveryStatus = new RecoveryStatus(request.recoveryId(), indexShard); + final RecoveryStatus recoveryStatus = new RecoveryStatus(request.recoveryId(), indexShard, request.sourceNode()); recoveryStatus.recoveryState.setType(request.recoveryType()); recoveryStatus.recoveryState.setSourceNode(request.sourceNode()); recoveryStatus.recoveryState.setTargetNode(request.targetNode()); diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 26abdb60640..663b6903b20 100644 --- a/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing.allocation; +import com.google.common.collect.ImmutableList; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -482,4 +483,97 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { // make sure the failedShard is not INITIALIZING again on node3 assertThat(routingNodes.node("node3").get(0).shardId(), not(equalTo(shardToFail.shardId()))); } + + @Test + public void testFailAllReplicasInitializingOnPrimaryFail() { + AllocationService allocation = createAllocationService(settingsBuilder() + .build()); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + // add 4 nodes + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build(); + clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState).routingTable()).build(); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2)); + // start primary shards + clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable()).build(); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); + + // fail the primary shard, check replicas get removed as well... + ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); + RoutingAllocation.Result routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail); + assertThat(routingResult.changed(), equalTo(true)); + clusterState = ClusterState.builder(clusterState).routingTable(routingResult.routingTable()).build(); + // the primary gets allocated on another node, replicas are unassigned + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2)); + + ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); + assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); + + // start the primary shard + clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable()).build(); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); + + // simulate another failure coming in, with the "old" shard routing, verify that nothing changes, and we ignore it + routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail); + assertThat(routingResult.changed(), equalTo(false)); + } + + @Test + public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToElect() { + AllocationService allocation = createAllocationService(settingsBuilder() + .build()); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + // add 4 nodes + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build(); + clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState).routingTable()).build(); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2)); + // start primary shards + clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable()).build(); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); + + // start another replica shard, while keep one initializing + clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, ImmutableList.of(clusterState.routingNodes().shardsWithState(INITIALIZING).get(0))).routingTable()).build(); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + // fail the primary shard, check one replica gets elected to primary, others become INITIALIZING (from it) + ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); + RoutingAllocation.Result routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail); + assertThat(routingResult.changed(), equalTo(true)); + clusterState = ClusterState.builder(clusterState).routingTable(routingResult.routingTable()).build(); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); + + ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); + assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); + + // simulate another failure coming in, with the "old" shard routing, verify that nothing changes, and we ignore it + routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail); + assertThat(routingResult.changed(), equalTo(false)); + } }