Add flat_skew setting to node overload decider (#3563)

* Add flat_skew setting to node overload decider

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
This commit is contained in:
Rishab Nahata 2022-06-14 10:13:22 +05:30 committed by GitHub
parent e5ad240445
commit 836a9c4910
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 338 additions and 50 deletions

View File

@ -45,14 +45,17 @@ import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
@ -351,4 +354,140 @@ public class AwarenessAllocationIT extends OpenSearchIntegTestCase {
assertThat(counts.get(B_1), equalTo(2));
assertThat(counts.get(noZoneNode), equalTo(2));
}
public void testThreeZoneOneReplicaWithForceZoneValueAndLoadAwareness() throws Exception {
int nodeCountPerAZ = 5;
int numOfShards = 30;
int numOfReplica = 1;
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.routing.allocation.load_awareness.skew_factor", "0.0")
.put("cluster.routing.allocation.load_awareness.provisioned_capacity", Integer.toString(nodeCountPerAZ * 3))
.build();
logger.info("--> starting 15 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);
// Creating index with 30 primary and 1 replica
createIndex(
"test-1",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplica)
.build()
);
ClusterHealthResponse health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test-1")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3))
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.execute()
.actionGet();
assertFalse(health.isTimedOut());
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1);
}
}
}
assertThat(counts.size(), equalTo(nodeCountPerAZ * 3));
// All shards should be started
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(numOfShards * (numOfReplica + 1)));
// stopping half nodes in zone a
int nodesToStop = nodeCountPerAZ / 2;
List<Settings> nodeDataPathSettings = new ArrayList<>();
for (int i = 0; i < nodesToStop; i++) {
nodeDataPathSettings.add(internalCluster().dataPathSettings(nodes_in_zone_a.get(i)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_a.get(i)));
}
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test-1")
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3 - nodesToStop))
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.execute()
.actionGet();
assertFalse(health.isTimedOut());
// Creating another index with 30 primary and 1 replica
createIndex(
"test-2",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplica)
.build()
);
health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test-1", "test-2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3 - nodesToStop))
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.execute()
.actionGet();
assertFalse(health.isTimedOut());
// Restarting the nodes back
for (int i = 0; i < nodesToStop; i++) {
internalCluster().startNode(
Settings.builder()
.put("node.name", nodes_in_zone_a.get(i))
.put(nodeDataPathSettings.get(i))
.put(commonSettings)
.put("node.attr.zone", "a")
.build()
);
}
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test-1", "test-2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3))
.setWaitForGreenStatus()
.setWaitForActiveShards(2 * numOfShards * (numOfReplica + 1))
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.execute()
.actionGet();
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
// All shards should be started now and cluster health should be green
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2 * numOfShards * (numOfReplica + 1)));
assertThat(health.isTimedOut(), equalTo(false));
}
}

View File

@ -33,11 +33,13 @@ import java.util.function.BiPredicate;
* </pre>
* <p>
* and prevent allocation on the surviving nodes of the under capacity cluster
* based on overload factor defined as a percentage by
* based on overload factor defined as a percentage and flat skew as absolute allowed skewness by
* </p>
* <pre>
* cluster.routing.allocation.load_awareness.skew_factor: X
* cluster.routing.allocation.load_awareness.flat_skew: N
* </pre>
* The total limit per node based on skew_factor doesn't limit primaries that previously
* The total limit per node based on skew_factor and flat_skew doesn't limit primaries that previously
* existed on the disk as those shards are force allocated by
* {@link AllocationDeciders#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)}
* however new primaries due to index creation, snapshot restore etc can be controlled via the below settings.
@ -74,6 +76,13 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider {
Setting.Property.Dynamic,
Property.NodeScope
);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING = Setting.intSetting(
"cluster.routing.allocation.load_awareness.flat_skew",
2,
2,
Property.Dynamic,
Property.NodeScope
);
private volatile int provisionedCapacity;
@ -81,12 +90,15 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider {
private volatile boolean allowUnassignedPrimaries;
private volatile int flatSkew;
private static final Logger logger = LogManager.getLogger(NodeLoadAwareAllocationDecider.class);
public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.skewFactor = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.get(settings);
this.provisionedCapacity = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.get(settings);
this.allowUnassignedPrimaries = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.get(settings);
this.flatSkew = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, this::setSkewFactor);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING,
@ -96,6 +108,7 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider {
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING,
this::setAllowUnassignedPrimaries
);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING, this::setFlatSkew);
}
private void setAllowUnassignedPrimaries(boolean allowUnassignedPrimaries) {
@ -110,6 +123,10 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider {
this.provisionedCapacity = provisionedCapacity;
}
private void setFlatSkew(int flatSkew) {
this.flatSkew = flatSkew;
}
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, (count, limit) -> count >= limit);
@ -146,7 +163,7 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider {
Metadata metadata = allocation.metadata();
float expectedAvgShardsPerNode = (float) metadata.getTotalNumberOfShards() / provisionedCapacity;
int nodeShardCount = node.numberOfOwningShards();
int limit = (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0));
int limit = flatSkew + (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0));
if (decider.test(nodeShardCount, limit)) {
logger.debug(
() -> new ParameterizedMessage(
@ -163,10 +180,11 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider {
Decision.NO,
NAME,
"too many shards [%d] allocated to this node, limit per node [%d] considering"
+ " overload factor [%.2f] based on capacity [%d]",
+ " overload factor [%.2f] and flat skew [%d] based on capacity [%d]",
nodeShardCount,
limit,
skewFactor,
flatSkew,
provisionedCapacity
);
}

View File

@ -556,6 +556,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING,
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED,
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED,
ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW,

View File

@ -22,7 +22,6 @@ import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.common.settings.Settings;
@ -106,9 +105,11 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
.nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node1", singletonMap("zone", "zone_1"))))
.build();
// 4 existing shards from this node's local store get started
// 4 existing shards from this node's local store get started and cluster rebalances
newState = strategy.reroute(newState, "reroute");
newState = startInitializingShardsAndReroute(strategy, newState);
while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) {
newState = startInitializingShardsAndReroute(strategy, newState);
}
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(32));
// add back node2 when skewness is still breached
@ -282,11 +283,14 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build();
newState = strategy.reroute(newState, "reroute");
newState = startInitializingShardsAndReroute(strategy, newState);
while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) {
newState = startInitializingShardsAndReroute(strategy, newState);
}
// 28 shards should be assigned (14 on each node -> 8 * 1.5 + 2)
logger.info("limits should be applied on newly create primaries");
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(24));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(16));
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(28));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(12));
assertEquals(
12L,
@ -298,7 +302,7 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
);
assertEquals(
4L,
0L,
newState.getRoutingNodes()
.shardsWithState(UNASSIGNED)
.stream()
@ -306,7 +310,7 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
.count()
);
assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(12));
assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(14));
logger.info("--> Remove node4 from zone holding primaries");
newState = removeNodes(newState, strategy, "node4");
@ -339,10 +343,10 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
logger.info("--> do another reroute, make sure nothing moves");
assertThat(strategy.reroute(newState, "reroute").routingTable(), sameInstance(newState.routingTable()));
assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(12));
assertThat(newState.getRoutingNodes().node("node5").size(), equalTo(12));
assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(14));
assertThat(newState.getRoutingNodes().node("node5").size(), equalTo(14));
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(24));
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(28));
newState = ClusterState.builder(newState)
.nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node1", singletonMap("zone", "zone_1"))))
@ -436,7 +440,8 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
newState = startInitializingShardsAndReroute(strategy, newState);
}
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(30));
// Each node can take 12 shards each (2 + ceil(8*1.2))
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(36));
for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT);
@ -458,10 +463,12 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build();
newState = strategy.reroute(newState, "reroute");
newState = startInitializingShardsAndReroute(strategy, newState);
while (!newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty()) {
newState = startInitializingShardsAndReroute(strategy, newState);
}
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(20));
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(66));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(14));
logger.info("add another index with 60 shards");
metadata = Metadata.builder(newState.metadata())
@ -482,8 +489,8 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
newState = startInitializingShardsAndReroute(strategy, newState);
}
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(120));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(20));
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(126));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(14));
logger.info("change settings to allow unassigned primaries");
strategy = createAllocationServiceWithAdditionalSettings(
@ -499,7 +506,7 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
);
for (RoutingNode node : newState.getRoutingNodes()) {
assertThat(node.size(), equalTo(40));
assertThat(node.size(), equalTo(42));
}
logger.info("add another index with 5 shards");
@ -513,15 +520,15 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
)
.build();
updatedRoutingTable = RoutingTable.builder(newState.routingTable()).addAsNew(metadata.index("test3")).build();
// increases avg shard per node to 145/5 = 29, overload factor 1.2, total allowed 35 per node and NO primaries get assigned
// since total owning shards are 40 per node already
// increases avg shard per node to 145/5 = 29, overload factor 1.2, total allowed 35+2=37 per node and NO primaries get assigned
// since total owning shards are 42 per node already
newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build();
newState = strategy.reroute(newState, "reroute");
newState = startInitializingShardsAndReroute(strategy, newState);
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(120));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(25));
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(126));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(19));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).stream().filter(ShardRouting::primary).count(), equalTo(5L));
}
@ -600,21 +607,24 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
newState = startInitializingShardsAndReroute(strategy, newState);
}
assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(5));
assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(5));
assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(7));
assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(7));
// add the removed node
newState = addNodes(newState, strategy, "zone3", "node11");
assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5));
newState = startInitializingShardsAndReroute(strategy, newState);
assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(5));
assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(6));
while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) {
newState = startInitializingShardsAndReroute(strategy, newState);
}
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60));
// add the removed node
newState = addNodes(newState, strategy, "zone3", "node12");
assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5));
newState = startInitializingShardsAndReroute(strategy, newState);
while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) {
newState = startInitializingShardsAndReroute(strategy, newState);
}
assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(5));
// add the removed node
@ -674,13 +684,14 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
logger.info("--> add five new node in new zone and reroute");
clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10");
// Each node can take 7 shards each now (2 + ceil(4*1.2))
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(30));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(25));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(30));
logger.info("--> complete relocation");
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(55));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(60));
logger.info("--> do another reroute, make sure nothing moves");
assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable()));
@ -707,6 +718,7 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
newState = startInitializingShardsAndReroute(strategy, newState);
}
// Each node can now have 5 shards each
assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(5));
assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(5));
@ -791,8 +803,9 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
newState = startInitializingShardsAndReroute(strategy, newState);
}
// ensure minority zone doesn't get overloaded
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(53));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(10));
// each node can take 10 shards each (2 + ceil(7*1.1))
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(61));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT);
}
@ -912,15 +925,20 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20));
// assert replicas are not assigned but primaries are
logger.info("--> replicas are not initializing");
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
// Each node can take 11 shards each (2 + ceil(8*1.1)), hence 2 replicas will also start
logger.info("--> 2 replicas are initializing");
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.INDEX_CREATED);
assertFalse(shard.primary());
}
logger.info("--> start the shards (replicas)");
while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) {
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
}
logger.info("--> do another reroute, make sure nothing moves");
assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable()));
@ -929,10 +947,12 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(18));
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) {
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
}
logger.info("--> replicas are started");
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(38));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40));
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.INDEX_CREATED);
@ -1012,11 +1032,12 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
newState = startInitializingShardsAndReroute(strategy, newState);
}
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(50));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(10));
// Each node can take 7 shards max ( 2 + ceil(4*1.2))
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
for (RoutingNode node : newState.getRoutingNodes()) {
assertThat(node.size(), equalTo(5));
assertThat(node.size(), equalTo(6));
}
// add the removed node
@ -1025,9 +1046,7 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
.build();
newState = strategy.reroute(newState, "reroute");
assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5));
newState = startInitializingShardsAndReroute(strategy, newState);
assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(5));
// add the removed node
newState = ClusterState.builder(newState)
@ -1035,9 +1054,7 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
.build();
newState = strategy.reroute(newState, "reroute");
assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5));
newState = startInitializingShardsAndReroute(strategy, newState);
assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(5));
// add the removed node
newState = ClusterState.builder(newState)
@ -1068,6 +1085,120 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
}
public void testThreeZoneOneReplicaWithSkewFactorZeroAllShardsAssignedAfterRecovery() {
AllocationService strategy = createAllocationServiceWithAdditionalSettings(
org.opensearch.common.collect.Map.of(
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(),
15,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(),
0,
"cluster.routing.allocation.awareness.force.zone.values",
"zone1,zone2,zone3"
)
);
logger.info("Building initial routing table for 'testThreeZoneOneReplicaWithSkewFactorZeroAllShardsAssignedAfterRecovery'");
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(30).numberOfReplicas(1))
.build();
RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(initialRoutingTable)
.build();
logger.info("--> adding five nodes on same zone and do rerouting");
clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3", "node4", "node5");
clusterState = strategy.reroute(clusterState, "reroute");
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(30));
logger.info("--> start the shards (primaries)");
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
logger.info("--> add five new node in new zone and reroute");
clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10");
logger.info("--> complete relocation");
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
ClusterState newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13", "node14", "node15");
while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) {
newState = startInitializingShardsAndReroute(strategy, newState);
}
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4));
assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4));
assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4));
assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(4));
assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(4));
logger.info("--> Removing three nodes from zone3");
newState = removeNodes(newState, strategy, "node11", "node12", "node13");
while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) {
newState = startInitializingShardsAndReroute(strategy, newState);
}
// Each node can take 6 shards max (2 + ceil(4*1.0)), so all shards should be assigned
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
logger.info("add another index with 30 primary 1 replica");
metadata = Metadata.builder(newState.metadata())
.put(
IndexMetadata.builder("test1")
.settings(
settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 30)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
)
)
.build();
RoutingTable updatedRoutingTable = RoutingTable.builder(newState.routingTable()).addAsNew(metadata.index("test1")).build();
newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build();
newState = strategy.reroute(newState, "reroute");
newState = startInitializingShardsAndReroute(strategy, newState);
// add the removed node
newState = ClusterState.builder(newState)
.nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node11", singletonMap("zone", "zone3"))))
.build();
newState = strategy.reroute(newState, "reroute");
newState = startInitializingShardsAndReroute(strategy, newState);
// add the removed node
newState = ClusterState.builder(newState)
.nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node12", singletonMap("zone", "zone3"))))
.build();
newState = strategy.reroute(newState, "reroute");
newState = startInitializingShardsAndReroute(strategy, newState);
// add the removed node
newState = ClusterState.builder(newState)
.nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node13", singletonMap("zone", "zone3"))))
.build();
newState = strategy.reroute(newState, "reroute");
while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) {
newState = startInitializingShardsAndReroute(strategy, newState);
}
assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(8));
assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(8));
assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(8));
// ensure all shards are assigned
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(120));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
}
private ClusterState removeNodes(ClusterState clusterState, AllocationService allocationService, String... nodeIds) {
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.getNodes());
org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.remove(nodeId));
@ -1097,7 +1228,6 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase {
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.awareness.attributes", "zone");
settingsValue.forEach((k, v) -> {
if (v instanceof Integer) settingsBuilder.put(k, (Integer) (v));