Only auto-expand replicas with allocation filtering when all nodes upgraded (#50361)

Follow-up to #48974 that ensures that replicas are only auto-expanded according to allocation
filtering rules once all nodes are upgraded to a version that supports this. Helps with
orchestrating cluster upgrades.
This commit is contained in:
Yannick Welsch 2019-12-20 10:22:23 +01:00
parent 40bce49a7f
commit 4f805deb0c
3 changed files with 106 additions and 7 deletions

View File

@ -774,4 +774,39 @@ public class RecoveryIT extends AbstractRollingTestCase {
assertEmptyTranslog(index);
}
}
public void testAutoExpandIndicesDuringRollingUpgrade() throws Exception {
final String indexName = "test-auto-expand-filtering";
final Version minimumNodeVersion = minimumNodeVersion();
Response response = client().performRequest(new Request("GET", "_nodes"));
ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Map<String, Object> nodeMap = objectPath.evaluate("nodes");
List<String> nodes = new ArrayList<>(nodeMap.keySet());
if (CLUSTER_TYPE == ClusterType.OLD) {
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._id", nodes.get(randomInt(2)))
.build());
}
ensureGreen(indexName);
final int numberOfReplicas = Integer.parseInt(
getIndexSettingsAsMap(indexName).get(IndexMetaData.SETTING_NUMBER_OF_REPLICAS).toString());
if (minimumNodeVersion.onOrAfter(Version.V_7_6_0)) {
assertEquals(nodes.size() - 2, numberOfReplicas);
} else {
assertEquals(nodes.size() - 1, numberOfReplicas);
}
}
@SuppressWarnings("unchecked")
private Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
Map<String, Object> indexSettings = getIndexSettings(index);
return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.metadata;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
@ -105,12 +106,17 @@ public final class AutoExpandReplicas {
private OptionalInt getDesiredNumberOfReplicas(IndexMetaData indexMetaData, RoutingAllocation allocation) {
if (enabled) {
int numMatchingDataNodes = 0;
// Only start using new logic once all nodes are migrated to 7.6.0, avoiding disruption during an upgrade
if (allocation.nodes().getMinNodeVersion().onOrAfter(Version.V_7_6_0)) {
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().getDataNodes().values()) {
Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetaData, cursor.value, allocation);
if (decision.type() != Decision.Type.NO) {
numMatchingDataNodes ++;
}
}
} else {
numMatchingDataNodes = allocation.nodes().getDataNodes().size();
}
final int min = getMinReplicas();
final int max = getMaxReplicas(numMatchingDataNodes);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.metadata;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
@ -32,6 +33,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.cluster.ClusterStateChanges;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -46,6 +48,7 @@ import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.isIn;
@ -104,12 +107,15 @@ public class AutoExpandReplicasTests extends ESTestCase {
private static final AtomicInteger nodeIdGenerator = new AtomicInteger();
protected DiscoveryNode createNode(DiscoveryNodeRole... mustHaveRoles) {
protected DiscoveryNode createNode(Version version, DiscoveryNodeRole... mustHaveRoles) {
Set<DiscoveryNodeRole> roles = new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES));
Collections.addAll(roles, mustHaveRoles);
final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet());
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles,
Version.CURRENT);
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, version);
}
protected DiscoveryNode createNode(DiscoveryNodeRole... mustHaveRoles) {
return createNode(Version.CURRENT, mustHaveRoles);
}
/**
@ -200,4 +206,56 @@ public class AutoExpandReplicasTests extends ESTestCase {
terminate(threadPool);
}
}
public void testOnlyAutoExpandAllocationFilteringAfterAllNodesUpgraded() {
final ThreadPool threadPool = new TestThreadPool(getClass().getName());
final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool);
try {
List<DiscoveryNode> allNodes = new ArrayList<>();
DiscoveryNode oldNode = createNode(VersionUtils.randomVersionBetween(random(), Version.V_7_0_0, Version.V_7_5_1),
DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE); // local node is the master
allNodes.add(oldNode);
ClusterState state = ClusterStateCreationUtils.state(oldNode, oldNode, allNodes.toArray(new DiscoveryNode[0]));
CreateIndexRequest request = new CreateIndexRequest("index",
Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all").build())
.waitForActiveShards(ActiveShardCount.NONE);
state = cluster.createIndex(state, request);
assertTrue(state.metaData().hasIndex("index"));
while (state.routingTable().index("index").shard(0).allShardsStarted() == false) {
logger.info(state);
state = cluster.applyStartedShards(state,
state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING));
state = cluster.reroute(state, new ClusterRerouteRequest());
}
DiscoveryNode newNode = createNode(Version.V_7_6_0,
DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE); // local node is the master
state = cluster.addNodes(state, Collections.singletonList(newNode));
// use allocation filtering
state = cluster.updateSettings(state, new UpdateSettingsRequest("index").settings(Settings.builder()
.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", oldNode.getName()).build()));
while (state.routingTable().index("index").shard(0).allShardsStarted() == false) {
logger.info(state);
state = cluster.applyStartedShards(state,
state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING));
state = cluster.reroute(state, new ClusterRerouteRequest());
}
// check that presence of old node means that auto-expansion does not take allocation filtering into account
assertThat(state.routingTable().index("index").shard(0).size(), equalTo(2));
// remove old node and check that auto-expansion takes allocation filtering into account
state = cluster.removeNodes(state, Collections.singletonList(oldNode));
assertThat(state.routingTable().index("index").shard(0).size(), equalTo(1));
} finally {
terminate(threadPool);
}
}
}