From e8467f0978154f3419e479d3030348e6bb1c7e1b Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 8 Apr 2014 10:28:31 +0200 Subject: [PATCH] Failed shards could be re-assigned to the same nodes if multiple replicas failed at once After a shard fails on a node we assign a new replica on another node. This is important in order to avoid failing again due to node specific problems. In the rare case where two different replicas of the same shard failed in a short time span, we may fail to do so and assign one of them back to the node it's currently on. This happens if both shard failed events are processed within the same batch on the master. Closes #5725 --- .../routing/allocation/RoutingAllocation.java | 17 ++++- .../cluster/IndicesClusterStateService.java | 6 +- .../allocation/FailedShardsRoutingTests.java | 74 ++++++++++++++++--- 3 files changed, 80 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index 07c46fe7641..71f2adb430a 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -29,7 +29,9 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.index.shard.ShardId; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; /** * The {@link RoutingAllocation} keep the state of the current allocation @@ -107,7 +109,7 @@ public class RoutingAllocation { private final ClusterInfo clusterInfo; - private Map ignoredShardToNodes = null; + private Map> ignoredShardToNodes = null; private boolean ignoreDisable = false; @@ -199,11 +201,20 @@ public class RoutingAllocation { if (ignoredShardToNodes == null) { ignoredShardToNodes = new HashMap<>(); } - ignoredShardToNodes.put(shardId, nodeId); + Set nodes = ignoredShardToNodes.get(shardId); + if (nodes == null) { + nodes = new HashSet<>(); + ignoredShardToNodes.put(shardId, nodes); + } + nodes.add(nodeId); } public boolean shouldIgnoreShardForNode(ShardId shardId, String nodeId) { - return ignoredShardToNodes != null && nodeId.equals(ignoredShardToNodes.get(shardId)); + if (ignoredShardToNodes == null) { + return false; + } + Set nodes = ignoredShardToNodes.get(shardId); + return nodes != null && nodes.contains(nodeId); } /** diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index f02edb54321..c7c140f77f4 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -60,11 +60,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.recovery.RecoveryFailedException; -import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.indices.recovery.RecoveryStatus; -import org.elasticsearch.indices.recovery.RecoveryTarget; -import org.elasticsearch.indices.recovery.StartRecoveryRequest; +import org.elasticsearch.indices.recovery.*; import org.elasticsearch.threadpool.ThreadPool; import java.util.HashMap; diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index bf21cef731e..26abdb60640 100644 --- a/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -23,10 +23,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.ImmutableShardRouting; -import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.common.logging.ESLogger; @@ -34,6 +31,8 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.test.ElasticsearchAllocationTestCase; import org.junit.Test; +import java.util.ArrayList; + import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.hamcrest.Matchers.*; @@ -63,8 +62,8 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { logger.info("--> adding 2 nodes on same rack and do rerouting"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() - .put(newNode("node1")) - .put(newNode("node2")) + .put(newNode("node1")) + .put(newNode("node2")) ).build(); RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState); @@ -85,7 +84,7 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { logger.info("--> adding additional node"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) - .put(newNode("node3")) + .put(newNode("node3")) ).build(); rerouteResult = allocation.reroute(clusterState); clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); @@ -101,7 +100,7 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { logger.info("--> moving primary shard to node3"); rerouteResult = allocation.reroute(clusterState, new AllocationCommands( - new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3")) + new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3")) ); assertThat(rerouteResult.changed(), equalTo(true)); clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); @@ -117,7 +116,7 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { logger.info("--> moving primary shard to node3"); rerouteResult = allocation.reroute(clusterState, new AllocationCommands( - new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3")) + new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3")) ); assertThat(rerouteResult.changed(), equalTo(true)); clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); @@ -273,6 +272,63 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase { assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING, 0)).changed(), equalTo(false)); } + @Test + public void singleShardMultipleAllocationFailures() { + AllocationService strategy = createAllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .build()); + + logger.info("Building initial routing table"); + int numberOfReplicas = scaledRandomIntBetween(2, 10); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(numberOfReplicas)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + logger.info("Adding {} nodes and performing rerouting", numberOfReplicas + 1); + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < numberOfReplicas + 1; i++) { + nodeBuilder.put(newNode("node" + Integer.toString(i))); + } + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + while (!clusterState.routingTable().shardsWithState(UNASSIGNED).isEmpty()) { + // start all initializing + clusterState = ClusterState.builder(clusterState) + .routingTable(strategy + .applyStartedShards(clusterState, clusterState.routingTable().shardsWithState(INITIALIZING)).routingTable() + ) + .build(); + // and assign more unassigned + clusterState = ClusterState.builder(clusterState).routingTable(strategy.reroute(clusterState).routingTable()).build(); + } + + int shardsToFail = randomIntBetween(1, numberOfReplicas); + ArrayList failedShards = new ArrayList<>(); + RoutingNodes routingNodes = clusterState.routingNodes(); + for (int i = 0; i < shardsToFail; i++) { + String n = "node" + Integer.toString(randomInt(numberOfReplicas)); + logger.info("failing shard on node [{}]", n); + ShardRouting shardToFail = routingNodes.node(n).get(0); + failedShards.add(new MutableShardRouting(shardToFail)); + } + + routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable(); + + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + for (ShardRouting failedShard : failedShards) { + if (!routingNodes.node(failedShard.currentNodeId()).isEmpty()) { + fail("shard " + failedShard + " was re-assigned to it's node"); + } + } + } + @Test public void firstAllocationFailureTwoNodes() { AllocationService strategy = createAllocationService(settingsBuilder()