diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 0e96decf953..bc4ce56fa72 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -774,4 +774,39 @@ public class RecoveryIT extends AbstractRollingTestCase { assertEmptyTranslog(index); } } + + public void testAutoExpandIndicesDuringRollingUpgrade() throws Exception { + final String indexName = "test-auto-expand-filtering"; + final Version minimumNodeVersion = minimumNodeVersion(); + + Response response = client().performRequest(new Request("GET", "_nodes")); + ObjectPath objectPath = ObjectPath.createFromResponse(response); + final Map nodeMap = objectPath.evaluate("nodes"); + List nodes = new ArrayList<>(nodeMap.keySet()); + + if (CLUSTER_TYPE == ClusterType.OLD) { + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) + .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all") + .put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._id", nodes.get(randomInt(2))) + .build()); + } + + ensureGreen(indexName); + + final int numberOfReplicas = Integer.parseInt( + getIndexSettingsAsMap(indexName).get(IndexMetaData.SETTING_NUMBER_OF_REPLICAS).toString()); + if (minimumNodeVersion.onOrAfter(Version.V_7_6_0)) { + assertEquals(nodes.size() - 2, numberOfReplicas); + } else { + assertEquals(nodes.size() - 1, numberOfReplicas); + } + } + + @SuppressWarnings("unchecked") + private Map getIndexSettingsAsMap(String index) throws IOException { + Map indexSettings = getIndexSettings(index); + return (Map)((Map) indexSettings.get(index)).get("settings"); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java b/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java index 346d755c379..ffd1c2a8263 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.metadata; import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -105,11 +106,16 @@ public final class AutoExpandReplicas { private OptionalInt getDesiredNumberOfReplicas(IndexMetaData indexMetaData, RoutingAllocation allocation) { if (enabled) { int numMatchingDataNodes = 0; - for (ObjectCursor cursor : allocation.nodes().getDataNodes().values()) { - Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetaData, cursor.value, allocation); - if (decision.type() != Decision.Type.NO) { - numMatchingDataNodes ++; + // Only start using new logic once all nodes are migrated to 7.6.0, avoiding disruption during an upgrade + if (allocation.nodes().getMinNodeVersion().onOrAfter(Version.V_7_6_0)) { + for (ObjectCursor cursor : allocation.nodes().getDataNodes().values()) { + Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetaData, cursor.value, allocation); + if (decision.type() != Decision.Type.NO) { + numMatchingDataNodes ++; + } } + } else { + numMatchingDataNodes = allocation.nodes().getDataNodes().size(); } final int min = getMinReplicas(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java index f78104201c9..32d25b09fa3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; @@ -32,6 +33,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.indices.cluster.ClusterStateChanges; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -46,6 +48,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.isIn; @@ -104,12 +107,15 @@ public class AutoExpandReplicasTests extends ESTestCase { private static final AtomicInteger nodeIdGenerator = new AtomicInteger(); - protected DiscoveryNode createNode(DiscoveryNodeRole... mustHaveRoles) { + protected DiscoveryNode createNode(Version version, DiscoveryNodeRole... mustHaveRoles) { Set roles = new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES)); Collections.addAll(roles, mustHaveRoles); final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet()); - return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, - Version.CURRENT); + return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, version); + } + + protected DiscoveryNode createNode(DiscoveryNodeRole... mustHaveRoles) { + return createNode(Version.CURRENT, mustHaveRoles); } /** @@ -200,4 +206,56 @@ public class AutoExpandReplicasTests extends ESTestCase { terminate(threadPool); } } + + public void testOnlyAutoExpandAllocationFilteringAfterAllNodesUpgraded() { + final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + + try { + List allNodes = new ArrayList<>(); + DiscoveryNode oldNode = createNode(VersionUtils.randomVersionBetween(random(), Version.V_7_0_0, Version.V_7_5_1), + DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE); // local node is the master + allNodes.add(oldNode); + ClusterState state = ClusterStateCreationUtils.state(oldNode, oldNode, allNodes.toArray(new DiscoveryNode[0])); + + CreateIndexRequest request = new CreateIndexRequest("index", + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_AUTO_EXPAND_REPLICAS, "0-all").build()) + .waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metaData().hasIndex("index")); + while (state.routingTable().index("index").shard(0).allShardsStarted() == false) { + logger.info(state); + state = cluster.applyStartedShards(state, + state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING)); + state = cluster.reroute(state, new ClusterRerouteRequest()); + } + + DiscoveryNode newNode = createNode(Version.V_7_6_0, + DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE); // local node is the master + + state = cluster.addNodes(state, Collections.singletonList(newNode)); + + // use allocation filtering + state = cluster.updateSettings(state, new UpdateSettingsRequest("index").settings(Settings.builder() + .put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", oldNode.getName()).build())); + + while (state.routingTable().index("index").shard(0).allShardsStarted() == false) { + logger.info(state); + state = cluster.applyStartedShards(state, + state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING)); + state = cluster.reroute(state, new ClusterRerouteRequest()); + } + + // check that presence of old node means that auto-expansion does not take allocation filtering into account + assertThat(state.routingTable().index("index").shard(0).size(), equalTo(2)); + + // remove old node and check that auto-expansion takes allocation filtering into account + state = cluster.removeNodes(state, Collections.singletonList(oldNode)); + assertThat(state.routingTable().index("index").shard(0).size(), equalTo(1)); + } finally { + terminate(threadPool); + } + } }