Revert "Only auto-expand replicas with allocation filtering when all nodes upgraded (#50361)"
This reverts commit df4fe73b84
.
This commit is contained in:
parent
7c10e9b0e7
commit
5f37f1f401
|
@ -774,39 +774,4 @@ public class RecoveryIT extends AbstractRollingTestCase {
|
||||||
assertEmptyTranslog(index);
|
assertEmptyTranslog(index);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAutoExpandIndicesDuringRollingUpgrade() throws Exception {
|
|
||||||
final String indexName = "test-auto-expand-filtering";
|
|
||||||
final Version minimumNodeVersion = minimumNodeVersion();
|
|
||||||
|
|
||||||
Response response = client().performRequest(new Request("GET", "_nodes"));
|
|
||||||
ObjectPath objectPath = ObjectPath.createFromResponse(response);
|
|
||||||
final Map<String, Object> nodeMap = objectPath.evaluate("nodes");
|
|
||||||
List<String> nodes = new ArrayList<>(nodeMap.keySet());
|
|
||||||
|
|
||||||
if (CLUSTER_TYPE == ClusterType.OLD) {
|
|
||||||
createIndex(indexName, Settings.builder()
|
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
|
|
||||||
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
|
|
||||||
.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._id", nodes.get(randomInt(2)))
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
|
|
||||||
ensureGreen(indexName);
|
|
||||||
|
|
||||||
final int numberOfReplicas = Integer.parseInt(
|
|
||||||
getIndexSettingsAsMap(indexName).get(IndexMetaData.SETTING_NUMBER_OF_REPLICAS).toString());
|
|
||||||
if (minimumNodeVersion.onOrAfter(Version.V_7_6_0)) {
|
|
||||||
assertEquals(nodes.size() - 2, numberOfReplicas);
|
|
||||||
} else {
|
|
||||||
assertEquals(nodes.size() - 1, numberOfReplicas);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
|
|
||||||
Map<String, Object> indexSettings = getIndexSettings(index);
|
|
||||||
return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.elasticsearch.cluster.metadata;
|
package org.elasticsearch.cluster.metadata;
|
||||||
|
|
||||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||||
import org.elasticsearch.Version;
|
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||||
|
@ -106,17 +105,12 @@ public final class AutoExpandReplicas {
|
||||||
private OptionalInt getDesiredNumberOfReplicas(IndexMetaData indexMetaData, RoutingAllocation allocation) {
|
private OptionalInt getDesiredNumberOfReplicas(IndexMetaData indexMetaData, RoutingAllocation allocation) {
|
||||||
if (enabled) {
|
if (enabled) {
|
||||||
int numMatchingDataNodes = 0;
|
int numMatchingDataNodes = 0;
|
||||||
// Only start using new logic once all nodes are migrated to 7.6.0, avoiding disruption during an upgrade
|
|
||||||
if (allocation.nodes().getMinNodeVersion().onOrAfter(Version.V_7_6_0)) {
|
|
||||||
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().getDataNodes().values()) {
|
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().getDataNodes().values()) {
|
||||||
Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetaData, cursor.value, allocation);
|
Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetaData, cursor.value, allocation);
|
||||||
if (decision.type() != Decision.Type.NO) {
|
if (decision.type() != Decision.Type.NO) {
|
||||||
numMatchingDataNodes ++;
|
numMatchingDataNodes ++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
numMatchingDataNodes = allocation.nodes().getDataNodes().size();
|
|
||||||
}
|
|
||||||
|
|
||||||
final int min = getMinReplicas();
|
final int min = getMinReplicas();
|
||||||
final int max = getMaxReplicas(numMatchingDataNodes);
|
final int max = getMaxReplicas(numMatchingDataNodes);
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.cluster.metadata;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
|
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
|
||||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
|
||||||
import org.elasticsearch.action.support.ActiveShardCount;
|
import org.elasticsearch.action.support.ActiveShardCount;
|
||||||
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
|
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
@ -33,7 +32,6 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.indices.cluster.ClusterStateChanges;
|
import org.elasticsearch.indices.cluster.ClusterStateChanges;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.VersionUtils;
|
|
||||||
import org.elasticsearch.threadpool.TestThreadPool;
|
import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
|
@ -48,7 +46,6 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
|
||||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
|
||||||
import static org.hamcrest.Matchers.everyItem;
|
import static org.hamcrest.Matchers.everyItem;
|
||||||
import static org.hamcrest.Matchers.isIn;
|
import static org.hamcrest.Matchers.isIn;
|
||||||
|
|
||||||
|
@ -107,15 +104,12 @@ public class AutoExpandReplicasTests extends ESTestCase {
|
||||||
|
|
||||||
private static final AtomicInteger nodeIdGenerator = new AtomicInteger();
|
private static final AtomicInteger nodeIdGenerator = new AtomicInteger();
|
||||||
|
|
||||||
protected DiscoveryNode createNode(Version version, DiscoveryNodeRole... mustHaveRoles) {
|
protected DiscoveryNode createNode(DiscoveryNodeRole... mustHaveRoles) {
|
||||||
Set<DiscoveryNodeRole> roles = new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES));
|
Set<DiscoveryNodeRole> roles = new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES));
|
||||||
Collections.addAll(roles, mustHaveRoles);
|
Collections.addAll(roles, mustHaveRoles);
|
||||||
final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet());
|
final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet());
|
||||||
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, version);
|
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles,
|
||||||
}
|
Version.CURRENT);
|
||||||
|
|
||||||
protected DiscoveryNode createNode(DiscoveryNodeRole... mustHaveRoles) {
|
|
||||||
return createNode(Version.CURRENT, mustHaveRoles);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -206,56 +200,4 @@ public class AutoExpandReplicasTests extends ESTestCase {
|
||||||
terminate(threadPool);
|
terminate(threadPool);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testOnlyAutoExpandAllocationFilteringAfterAllNodesUpgraded() {
|
|
||||||
final ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
|
||||||
final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool);
|
|
||||||
|
|
||||||
try {
|
|
||||||
List<DiscoveryNode> allNodes = new ArrayList<>();
|
|
||||||
DiscoveryNode oldNode = createNode(VersionUtils.randomVersionBetween(random(), Version.V_7_0_0, Version.V_7_5_1),
|
|
||||||
DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE); // local node is the master
|
|
||||||
allNodes.add(oldNode);
|
|
||||||
ClusterState state = ClusterStateCreationUtils.state(oldNode, oldNode, allNodes.toArray(new DiscoveryNode[0]));
|
|
||||||
|
|
||||||
CreateIndexRequest request = new CreateIndexRequest("index",
|
|
||||||
Settings.builder()
|
|
||||||
.put(SETTING_NUMBER_OF_SHARDS, 1)
|
|
||||||
.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all").build())
|
|
||||||
.waitForActiveShards(ActiveShardCount.NONE);
|
|
||||||
state = cluster.createIndex(state, request);
|
|
||||||
assertTrue(state.metaData().hasIndex("index"));
|
|
||||||
while (state.routingTable().index("index").shard(0).allShardsStarted() == false) {
|
|
||||||
logger.info(state);
|
|
||||||
state = cluster.applyStartedShards(state,
|
|
||||||
state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING));
|
|
||||||
state = cluster.reroute(state, new ClusterRerouteRequest());
|
|
||||||
}
|
|
||||||
|
|
||||||
DiscoveryNode newNode = createNode(Version.V_7_6_0,
|
|
||||||
DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE); // local node is the master
|
|
||||||
|
|
||||||
state = cluster.addNodes(state, Collections.singletonList(newNode));
|
|
||||||
|
|
||||||
// use allocation filtering
|
|
||||||
state = cluster.updateSettings(state, new UpdateSettingsRequest("index").settings(Settings.builder()
|
|
||||||
.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", oldNode.getName()).build()));
|
|
||||||
|
|
||||||
while (state.routingTable().index("index").shard(0).allShardsStarted() == false) {
|
|
||||||
logger.info(state);
|
|
||||||
state = cluster.applyStartedShards(state,
|
|
||||||
state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING));
|
|
||||||
state = cluster.reroute(state, new ClusterRerouteRequest());
|
|
||||||
}
|
|
||||||
|
|
||||||
// check that presence of old node means that auto-expansion does not take allocation filtering into account
|
|
||||||
assertThat(state.routingTable().index("index").shard(0).size(), equalTo(2));
|
|
||||||
|
|
||||||
// remove old node and check that auto-expansion takes allocation filtering into account
|
|
||||||
state = cluster.removeNodes(state, Collections.singletonList(oldNode));
|
|
||||||
assertThat(state.routingTable().index("index").shard(0).size(), equalTo(1));
|
|
||||||
} finally {
|
|
||||||
terminate(threadPool);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue