diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index e93d071b0cf..8268b98f34d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -320,14 +320,23 @@ public class RoutingNodes implements Iterable { /** * Returns one active replica shard for the given shard id or null if * no active replica is found. + * + * Since replicas could possibly be on nodes with a older version of ES than + * the primary is, this will return replicas on the highest version of ES. + * */ - public ShardRouting activeReplica(ShardId shardId) { - for (ShardRouting shardRouting : assignedShards(shardId)) { - if (!shardRouting.primary() && shardRouting.active()) { - return shardRouting; - } - } - return null; + public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { + // It's possible for replicaNodeVersion to be null, when deassociating dead nodes + // that have been removed, the shards are failed, and part of the shard failing + // calls this method with an out-of-date RoutingNodes, where the version might not + // be accessible. Therefore, we need to protect against the version being null + // (meaning the node will be going away). + return assignedShards(shardId).stream() + .filter(shr -> !shr.primary() && shr.active()) + .filter(shr -> node(shr.currentNodeId()) != null) + .max(Comparator.comparing(shr -> node(shr.currentNodeId()).node(), + Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion)))) + .orElse(null); } /** @@ -567,7 +576,7 @@ public class RoutingNodes implements Iterable { if (failedShard.relocatingNodeId() == null) { if (failedShard.primary()) { // promote active replica to primary if active replica exists (only the case for shadow replicas) - ShardRouting activeReplica = activeReplica(failedShard.shardId()); + ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); if (activeReplica == null) { moveToUnassigned(failedShard, unassignedInfo); } else { @@ -596,7 +605,7 @@ public class RoutingNodes implements Iterable { assert failedShard.active(); if (failedShard.primary()) { // promote active replica to primary if active replica exists - ShardRouting activeReplica = activeReplica(failedShard.shardId()); + ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); if (activeReplica == null) { moveToUnassigned(failedShard, unassignedInfo); } else { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java index 61a28897d58..3b551e91294 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java @@ -19,20 +19,50 @@ package org.elasticsearch.cluster.routing.allocation; +import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.indices.cluster.AbstractIndicesClusterStateServiceTestCase; +import org.elasticsearch.indices.cluster.ClusterStateChanges; +import org.elasticsearch.indices.cluster.IndicesClusterStateService; +import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.hamcrest.Matchers.equalTo; @@ -91,4 +121,120 @@ public class FailedNodeRoutingTests extends ESAllocationTestCase { assertThat(routingNode.numberOfShardsWithState(INITIALIZING), equalTo(1)); } } + + public void testRandomClusterPromotesNewestReplica() throws InterruptedException { + + ThreadPool threadPool = new TestThreadPool(getClass().getName()); + ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + ClusterState state = randomInitialClusterState(); + + // randomly add nodes of mixed versions + logger.info("--> adding random nodes"); + for (int i = 0; i < randomIntBetween(4, 8); i++) { + DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()) + .add(createNode()).build(); + state = ClusterState.builder(state).nodes(newNodes).build(); + state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after adding node + } + + // Log the node versions (for debugging if necessary) + for (ObjectCursor cursor : state.nodes().getDataNodes().values()) { + Version nodeVer = cursor.value.getVersion(); + logger.info("--> node [{}] has version [{}]", cursor.value.getId(), nodeVer); + } + + // randomly create some indices + logger.info("--> creating some indices"); + for (int i = 0; i < randomIntBetween(2, 5); i++) { + String name = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT); + Settings.Builder settingsBuilder = Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 4)) + .put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(2, 4)); + CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metaData().hasIndex(name)); + } + + ClusterState previousState = state; + + logger.info("--> starting shards"); + state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); + logger.info("--> starting replicas a random number of times"); + for (int i = 0; i < randomIntBetween(1,10); i++) { + state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); + } + + boolean keepGoing = true; + while (keepGoing) { + List primaries = state.getRoutingNodes().shardsWithState(STARTED) + .stream().filter(ShardRouting::primary).collect(Collectors.toList()); + + // Pick a random subset of primaries to fail + List shardsToFail = new ArrayList<>(); + List failedPrimaries = randomSubsetOf(primaries); + failedPrimaries.stream().forEach(sr -> { + shardsToFail.add(new FailedShard(randomFrom(sr), "failed primary", new Exception())); + }); + + logger.info("--> state before failing shards: {}", state); + state = cluster.applyFailedShards(state, shardsToFail); + + final ClusterState compareState = state; + failedPrimaries.forEach(shardRouting -> { + logger.info("--> verifying version for {}", shardRouting); + + ShardRouting newPrimary = compareState.routingTable().index(shardRouting.index()) + .shard(shardRouting.id()).primaryShard(); + Version newPrimaryVersion = getNodeVersion(newPrimary, compareState); + + logger.info("--> new primary is on version {}: {}", newPrimaryVersion, newPrimary); + compareState.routingTable().shardRoutingTable(newPrimary.shardId()).shardsWithState(STARTED) + .stream() + .forEach(sr -> { + Version candidateVer = getNodeVersion(sr, compareState); + if (candidateVer != null) { + logger.info("--> candidate on {} node; shard routing: {}", candidateVer, sr); + assertTrue("candidate was not on the newest version, new primary is on " + + newPrimaryVersion + " and there is a candidate on " + candidateVer, + candidateVer.onOrBefore(newPrimaryVersion)); + } + }); + }); + + keepGoing = randomBoolean(); + } + terminate(threadPool); + } + + private static Version getNodeVersion(ShardRouting shardRouting, ClusterState state) { + return Optional.ofNullable(state.getNodes().get(shardRouting.currentNodeId())).map(DiscoveryNode::getVersion).orElse(null); + } + + private static final AtomicInteger nodeIdGenerator = new AtomicInteger(); + + public ClusterState randomInitialClusterState() { + List allNodes = new ArrayList<>(); + DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master + allNodes.add(localNode); + // at least two nodes that have the data role so that we can allocate shards + allNodes.add(createNode(DiscoveryNode.Role.DATA)); + allNodes.add(createNode(DiscoveryNode.Role.DATA)); + for (int i = 0; i < randomIntBetween(2, 5); i++) { + allNodes.add(createNode()); + } + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()])); + return state; + } + + + protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) { + Set roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values()))); + for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) { + roles.add(mustHaveRole); + } + final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet()); + return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, + VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, null)); + } + } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 6063faba156..2eedeba63f3 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -19,12 +19,14 @@ package org.elasticsearch.cluster.routing.allocation; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; @@ -35,6 +37,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllo import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.VersionUtils; import java.util.ArrayList; import java.util.Collections; @@ -499,7 +502,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { Collections.singletonList(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0))); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); - ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplica(shardId); + ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); // fail the primary shard, check replicas get removed as well... @@ -556,4 +559,119 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); } + + public void testReplicaOnNewestVersionIsPromoted() { + AllocationService allocation = createAllocationService(Settings.builder().build()); + + MetaData metaData = MetaData.builder().put(IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(3)) .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData).routingTable(initialRoutingTable).build(); + + ShardId shardId = new ShardId(metaData.index("test").getIndex(), 0); + + // add a single node + clusterState = ClusterState.builder(clusterState).nodes( + DiscoveryNodes.builder() + .add(newNode("node1-5.x", Version.V_5_6_0))) + .build(); + clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build(); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3)); + + // start primary shard + clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3)); + + // add another 5.6 node + clusterState = ClusterState.builder(clusterState).nodes( + DiscoveryNodes.builder(clusterState.nodes()) + .add(newNode("node2-5.x", Version.V_5_6_0))) + .build(); + + // start the shards, should have 1 primary and 1 replica available + clusterState = allocation.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2)); + + clusterState = ClusterState.builder(clusterState).nodes( + DiscoveryNodes.builder(clusterState.nodes()) + .add(newNode("node3-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null))) + .add(newNode("node4-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null)))) + .build(); + + // start all the replicas + clusterState = allocation.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); + clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + + ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); + logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica); + + // fail the primary shard again and make sure the correct replica is promoted + ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); + ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail); + assertThat(newState, not(equalTo(clusterState))); + clusterState = newState; + // the primary gets allocated on another node + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(3)); + + ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); + assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); + assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId())); + + Version replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion(); + assertNotNull(replicaNodeVersion); + logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion); + + for (ObjectCursor cursor : clusterState.nodes().getDataNodes().values()) { + if ("node1".equals(cursor.value.getId())) { + // Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check + continue; + } + Version nodeVer = cursor.value.getVersion(); + assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, + replicaNodeVersion.onOrAfter(nodeVer)); + } + + startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); + logger.info("--> failing primary shard a second time, should select: {}", startedReplica); + + // fail the primary shard again, and ensure the same thing happens + ShardRouting secondPrimaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); + newState = allocation.applyFailedShard(clusterState, secondPrimaryShardToFail); + assertThat(newState, not(equalTo(clusterState))); + clusterState = newState; + // the primary gets allocated on another node + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + + newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); + assertThat(newPrimaryShard, not(equalTo(secondPrimaryShardToFail))); + assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId())); + + replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion(); + assertNotNull(replicaNodeVersion); + logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion); + + for (ObjectCursor cursor : clusterState.nodes().getDataNodes().values()) { + if (primaryShardToFail.currentNodeId().equals(cursor.value.getId()) || + secondPrimaryShardToFail.currentNodeId().equals(cursor.value.getId())) { + // Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check + continue; + } + Version nodeVer = cursor.value.getVersion(); + assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, + replicaNodeVersion.onOrAfter(nodeVer)); + } + } }