From d5f028e0853609a61ee59199784d6abad547191c Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 14 May 2018 20:12:52 +0200 Subject: [PATCH] Auto-expand replicas only after failing nodes (#30553) #30423 combined auto-expansion in the same cluster state update where nodes are removed. As the auto-expansion step would run before deassociating the dead nodes from the routing table, the auto-expansion would possibly remove replicas from live nodes instead of dead ones. This commit reverses the order to ensure that when nodes leave the cluster that the auto-expand-replica functionality only triggers after failing the shards on the removed nodes. This ensures that active shards on other live nodes are not failed if the primary resided on a now dead node. Instead, one of the replicas on the live nodes first gets promoted to primary, and the auto- expansion (removing replicas) only triggers in a follow-up step (but still same cluster state update). Relates to #30456 and follow-up of #30423 --- .../routing/allocation/AllocationService.java | 42 +++--- .../discovery/zen/NodeJoinController.java | 4 +- .../metadata/AutoExpandReplicasTests.java | 128 ++++++++++++++++++ .../indices/cluster/ClusterStateChanges.java | 10 ++ 4 files changed, 164 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index deb10b83b5a..569ddd6cee7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -114,11 +114,24 @@ public class AllocationService extends AbstractComponent { } protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, RoutingAllocation allocation, String reason) { - RoutingTable oldRoutingTable = oldState.routingTable(); - RoutingNodes newRoutingNodes = allocation.routingNodes(); + ClusterState newState = buildResult(oldState, allocation); + + logClusterHealthStateChange( + new ClusterStateHealth(oldState), + new ClusterStateHealth(newState), + reason + ); + + return newState; + } + + private ClusterState buildResult(ClusterState oldState, RoutingAllocation allocation) { + final RoutingTable oldRoutingTable = oldState.routingTable(); + final RoutingNodes newRoutingNodes = allocation.routingNodes(); final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(oldRoutingTable.version(), newRoutingNodes).build(); - MetaData newMetaData = allocation.updateMetaDataWithRoutingChanges(newRoutingTable); + final MetaData newMetaData = allocation.updateMetaDataWithRoutingChanges(newRoutingTable); assert newRoutingTable.validate(newMetaData); // validates the routing table is coherent with the cluster state metadata + final ClusterState.Builder newStateBuilder = ClusterState.builder(oldState) .routingTable(newRoutingTable) .metaData(newMetaData); @@ -131,13 +144,7 @@ public class AllocationService extends AbstractComponent { newStateBuilder.customs(customsBuilder.build()); } } - final ClusterState newState = newStateBuilder.build(); - logClusterHealthStateChange( - new ClusterStateHealth(oldState), - new ClusterStateHealth(newState), - reason - ); - return newState; + return newStateBuilder.build(); } // Used for testing @@ -209,24 +216,23 @@ public class AllocationService extends AbstractComponent { * if needed. */ public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) { - ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState); - RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState); + RoutingNodes routingNodes = getMutableRoutingNodes(clusterState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards routingNodes.unassigned().shuffle(); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState, + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState, clusterInfoService.getClusterInfo(), currentNanoTime()); // first, clear from the shards any node id they used to belong to that is now dead deassociateDeadNodes(allocation); - if (reroute) { - reroute(allocation); + if (allocation.routingNodesChanged()) { + clusterState = buildResult(clusterState, allocation); } - - if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) { + if (reroute) { + return reroute(clusterState, reason); + } else { return clusterState; } - return buildResultAndLogHealthChange(clusterState, allocation, reason); } /** diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index e59fc8ad513..5cceba237e5 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -380,7 +380,7 @@ public class NodeJoinController extends AbstractComponent { /** * a task indicated that the current node should become master, if no current master is known */ - private static final DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_", + public static final DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_", new TransportAddress(TransportAddress.META_ADDRESS, 0), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) { @Override @@ -393,7 +393,7 @@ public class NodeJoinController extends AbstractComponent { * a task that is used to signal the election is stopped and we should process pending joins. * it may be use in combination with {@link #BECOME_MASTER_TASK} */ - private static final DiscoveryNode FINISH_ELECTION_TASK = new DiscoveryNode("_FINISH_ELECTION_", + public static final DiscoveryNode FINISH_ELECTION_TASK = new DiscoveryNode("_FINISH_ELECTION_", new TransportAddress(TransportAddress.META_ADDRESS, 0), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) { @Override public String toString() { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java index 32312f34e21..f24dbfbd002 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java @@ -18,8 +18,36 @@ */ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.indices.cluster.ClusterStateChanges; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.isIn; public class AutoExpandReplicasTests extends ESTestCase { @@ -72,4 +100,104 @@ public class AutoExpandReplicasTests extends ESTestCase { } } + + private static final AtomicInteger nodeIdGenerator = new AtomicInteger(); + + protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) { + Set roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values()))); + for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) { + roles.add(mustHaveRole); + } + final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet()); + return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, + Version.CURRENT); + } + + /** + * Checks that when nodes leave the cluster that the auto-expand-replica functionality only triggers after failing the shards on + * the removed nodes. This ensures that active shards on other live nodes are not failed if the primary resided on a now dead node. + * Instead, one of the replicas on the live nodes first gets promoted to primary, and the auto-expansion (removing replicas) only + * triggers in a follow-up step. + */ + public void testAutoExpandWhenNodeLeavesAndPossiblyRejoins() throws InterruptedException { + final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + + try { + List allNodes = new ArrayList<>(); + DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master + allNodes.add(localNode); + int numDataNodes = randomIntBetween(3, 5); + List dataNodes = new ArrayList<>(numDataNodes); + for (int i = 0; i < numDataNodes; i++) { + dataNodes.add(createNode(DiscoveryNode.Role.DATA)); + } + allNodes.addAll(dataNodes); + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()])); + + CreateIndexRequest request = new CreateIndexRequest("index", + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_AUTO_EXPAND_REPLICAS, "0-all").build()) + .waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metaData().hasIndex("index")); + while (state.routingTable().index("index").shard(0).allShardsStarted() == false) { + logger.info(state); + state = cluster.applyStartedShards(state, + state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING)); + state = cluster.reroute(state, new ClusterRerouteRequest()); + } + + IndexShardRoutingTable preTable = state.routingTable().index("index").shard(0); + final Set unchangedNodeIds; + final IndexShardRoutingTable postTable; + + if (randomBoolean()) { + // simulate node removal + List nodesToRemove = randomSubsetOf(2, dataNodes); + unchangedNodeIds = dataNodes.stream().filter(n -> nodesToRemove.contains(n) == false) + .map(DiscoveryNode::getId).collect(Collectors.toSet()); + + state = cluster.removeNodes(state, nodesToRemove); + postTable = state.routingTable().index("index").shard(0); + + assertTrue("not all shards started in " + state.toString(), postTable.allShardsStarted()); + assertThat(postTable.toString(), postTable.getAllAllocationIds(), everyItem(isIn(preTable.getAllAllocationIds()))); + } else { + // fake an election where conflicting nodes are removed and readded + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).masterNodeId(null).build()).build(); + + List conflictingNodes = randomSubsetOf(2, dataNodes); + unchangedNodeIds = dataNodes.stream().filter(n -> conflictingNodes.contains(n) == false) + .map(DiscoveryNode::getId).collect(Collectors.toSet()); + + List nodesToAdd = conflictingNodes.stream() + .map(n -> new DiscoveryNode(n.getName(), n.getId(), buildNewFakeTransportAddress(), n.getAttributes(), n.getRoles(), n.getVersion())) + .collect(Collectors.toList()); + + if (randomBoolean()) { + nodesToAdd.add(createNode(DiscoveryNode.Role.DATA)); + } + + state = cluster.joinNodesAndBecomeMaster(state, nodesToAdd); + postTable = state.routingTable().index("index").shard(0); + } + + Set unchangedAllocationIds = preTable.getShards().stream().filter(shr -> unchangedNodeIds.contains(shr.currentNodeId())) + .map(shr -> shr.allocationId().getId()).collect(Collectors.toSet()); + + assertThat(postTable.toString(), unchangedAllocationIds, everyItem(isIn(postTable.getAllAllocationIds()))); + + postTable.getShards().forEach( + shardRouting -> { + if (shardRouting.assignedToNode() && unchangedAllocationIds.contains(shardRouting.allocationId().getId())) { + assertTrue("Shard should be active: " + shardRouting, shardRouting.active()); + } + } + ); + } finally { + terminate(threadPool); + } + } } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 9e8638af249..8bfd08244e4 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -87,6 +87,7 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -232,6 +233,15 @@ public class ClusterStateChanges extends AbstractComponent { return runTasks(joinTaskExecutor, clusterState, nodes); } + public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List nodes) { + List joinNodes = new ArrayList<>(); + joinNodes.add(NodeJoinController.BECOME_MASTER_TASK); + joinNodes.add(NodeJoinController.FINISH_ELECTION_TASK); + joinNodes.addAll(nodes); + + return runTasks(joinTaskExecutor, clusterState, joinNodes); + } + public ClusterState removeNodes(ClusterState clusterState, List nodes) { return runTasks(nodeRemovalExecutor, clusterState, nodes.stream() .map(n -> new ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task(n, "dummy reason")).collect(Collectors.toList()));