[1.x] An allocation constraint mechanism, that de-prioritizes nodes from getting picked for allocation if they breach certain constraints (#777)
* An allocation constraint mechanism, that de-prioritizes nodes from getting picked for allocation if they breach certain constraints Signed-off-by: Ashwin Pankaj <appankaj@amazon.com> * Precommit fixes Signed-off-by: Ashwin Pankaj <appankaj@amazon.com> * Fix license header in test file Signed-off-by: Ashwin Pankaj <appankaj@amazon.com> * Review comments Signed-off-by: Ashwin Pankaj <appankaj@amazon.com> Co-authored-by: Pankaj <appankaj@88665a1e205d.ant.amazon.com>
This commit is contained in:
parent
4f5b48f722
commit
b4c697a29d
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Copyright OpenSearch Contributors.
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package org.opensearch.cluster.routing.allocation;
|
||||
|
||||
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
* Allocation constraints specify conditions which, if breached, reduce the
|
||||
* priority of a node for receiving shard allocations.
|
||||
*/
|
||||
public class AllocationConstraints {
|
||||
public final long CONSTRAINT_WEIGHT = 1000000L;
|
||||
private List<Predicate<ConstraintParams>> constraintPredicates;
|
||||
|
||||
public AllocationConstraints() {
|
||||
this.constraintPredicates = new ArrayList<>(1);
|
||||
this.constraintPredicates.add(isIndexShardsPerNodeBreached());
|
||||
}
|
||||
|
||||
class ConstraintParams {
|
||||
private BalancedShardsAllocator.Balancer balancer;
|
||||
private BalancedShardsAllocator.ModelNode node;
|
||||
private String index;
|
||||
|
||||
ConstraintParams(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node,
|
||||
String index) {
|
||||
this.balancer = balancer;
|
||||
this.node = node;
|
||||
this.index = index;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluates configured allocation constraint predicates for given node - index
|
||||
* combination; and returns a weight value based on the number of breached
|
||||
* constraints.
|
||||
*
|
||||
* Constraint weight should be added to the weight calculated via weight
|
||||
* function, to reduce priority of allocating on nodes with breached
|
||||
* constraints.
|
||||
*
|
||||
* This weight function is used only in case of unassigned shards to avoid overloading a newly added node.
|
||||
* Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this function.
|
||||
*/
|
||||
public long weight(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node,
|
||||
String index) {
|
||||
int constraintsBreached = 0;
|
||||
ConstraintParams params = new ConstraintParams(balancer, node, index);
|
||||
for (Predicate<ConstraintParams> predicate : constraintPredicates) {
|
||||
if (predicate.test(params)) {
|
||||
constraintsBreached++;
|
||||
}
|
||||
}
|
||||
return constraintsBreached * CONSTRAINT_WEIGHT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constraint to control number of shards of an index allocated on a single
|
||||
* node.
|
||||
*
|
||||
* In current weight function implementation, when a node has significantly
|
||||
* fewer shards than other nodes (e.g. during single new node addition or node
|
||||
* replacement), its weight is much less than other nodes. All shard allocations
|
||||
* at this time tend to land on the new node with skewed weight. This breaks
|
||||
* index level balance in the cluster, by creating all shards of the same index
|
||||
* on one node, often resulting in a hotspot on that node.
|
||||
*
|
||||
* This constraint is breached when balancer attempts to allocate more than
|
||||
* average shards per index per node.
|
||||
*/
|
||||
private Predicate<ConstraintParams> isIndexShardsPerNodeBreached() {
|
||||
return (params) -> {
|
||||
int currIndexShardsOnNode = params.node.numShards(params.index);
|
||||
int allowedIndexShardsPerNode = (int) Math.ceil(params.balancer.avgShardsPerNode(params.index));
|
||||
return (currIndexShardsOnNode >= allowedIndexShardsPerNode);
|
||||
};
|
||||
}
|
||||
|
||||
}
|
|
@ -45,6 +45,7 @@ import org.opensearch.cluster.routing.ShardRoutingState;
|
|||
import org.opensearch.cluster.routing.UnassignedInfo;
|
||||
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
||||
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
|
||||
import org.opensearch.cluster.routing.allocation.AllocationConstraints;
|
||||
import org.opensearch.cluster.routing.allocation.AllocationDecision;
|
||||
import org.opensearch.cluster.routing.allocation.MoveDecision;
|
||||
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
|
||||
|
@ -192,7 +193,6 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
|||
return weightFunction.shardBalance;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This class is the primary weight function used to create balanced over nodes and shards in the cluster.
|
||||
* Currently this function has 3 properties:
|
||||
|
@ -216,13 +216,16 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
|||
* </li>
|
||||
* </ul>
|
||||
* <code>weight(node, index) = weight<sub>index</sub>(node, index) + weight<sub>node</sub>(node, index)</code>
|
||||
*
|
||||
* package-private for testing
|
||||
*/
|
||||
private static class WeightFunction {
|
||||
static class WeightFunction {
|
||||
|
||||
private final float indexBalance;
|
||||
private final float shardBalance;
|
||||
private final float theta0;
|
||||
private final float theta1;
|
||||
private AllocationConstraints constraints;
|
||||
|
||||
WeightFunction(float indexBalance, float shardBalance) {
|
||||
float sum = indexBalance + shardBalance;
|
||||
|
@ -233,6 +236,12 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
|||
theta1 = indexBalance / sum;
|
||||
this.indexBalance = indexBalance;
|
||||
this.shardBalance = shardBalance;
|
||||
this.constraints = new AllocationConstraints();
|
||||
}
|
||||
|
||||
public float weightWithAllocationConstraints(Balancer balancer, ModelNode node, String index) {
|
||||
float balancerWeight = weight(balancer, node, index);
|
||||
return balancerWeight + constraints.weight(balancer, node, index);
|
||||
}
|
||||
|
||||
float weight(Balancer balancer, ModelNode node, String index) {
|
||||
|
@ -411,7 +420,10 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
|||
boolean deltaAboveThreshold = lessThan(currentDelta, threshold) == false;
|
||||
// calculate the delta of the weights of the two nodes if we were to add the shard to the
|
||||
// node in question and move it away from the node that currently holds it.
|
||||
boolean betterWeightWithShardAdded = nodeWeight + 1.0f < currentWeight;
|
||||
// hence we add 2.0f to the weight delta
|
||||
float proposedDelta = 2.0f + nodeWeight - currentWeight;
|
||||
boolean betterWeightWithShardAdded = proposedDelta < currentDelta;
|
||||
|
||||
rebalanceConditionsMet = deltaAboveThreshold && betterWeightWithShardAdded;
|
||||
// if the simulated weight delta with the shard moved away is better than the weight delta
|
||||
// with the shard remaining on the current node, and we are allowed to allocate to the
|
||||
|
@ -964,7 +976,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
|||
}
|
||||
|
||||
// weight of this index currently on the node
|
||||
float currentWeight = weight.weight(this, node, shard.getIndexName());
|
||||
float currentWeight = weight.weightWithAllocationConstraints(this, node, shard.getIndexName());
|
||||
// moving the shard would not improve the balance, and we are not in explain mode, so short circuit
|
||||
if (currentWeight > minWeight && explain == false) {
|
||||
continue;
|
||||
|
@ -1002,7 +1014,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
|||
updateMinNode = currentDecision.type() == Type.YES;
|
||||
}
|
||||
} else {
|
||||
updateMinNode = true;
|
||||
updateMinNode = currentWeight < minWeight;
|
||||
}
|
||||
if (updateMinNode) {
|
||||
minNode = node;
|
||||
|
@ -1086,7 +1098,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
|||
|
||||
}
|
||||
|
||||
static class ModelNode implements Iterable<ModelIndex> {
|
||||
public static class ModelNode implements Iterable<ModelIndex> {
|
||||
private final Map<String, ModelIndex> indices = new HashMap<>();
|
||||
private int numShards = 0;
|
||||
private final RoutingNode routingNode;
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.cluster.routing.allocation;
|
||||
|
||||
import org.opensearch.cluster.OpenSearchAllocationTestCase;
|
||||
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||
import org.opensearch.common.settings.ClusterSettings;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class AllocationConstraintsTests extends OpenSearchAllocationTestCase {
|
||||
|
||||
public void testSettings() {
|
||||
Settings.Builder settings = Settings.builder();
|
||||
ClusterSettings service = new ClusterSettings(Settings.builder().build(),
|
||||
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
BalancedShardsAllocator allocator = new BalancedShardsAllocator(settings.build(), service);
|
||||
|
||||
settings = Settings.builder();
|
||||
float indexBalanceFactor = randomFloat();
|
||||
float shardBalance = randomFloat();
|
||||
float threshold = randomFloat();
|
||||
settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalanceFactor);
|
||||
settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance);
|
||||
settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), threshold);
|
||||
|
||||
service.applySettings(settings.build());
|
||||
|
||||
assertEquals(indexBalanceFactor, allocator.getIndexBalance(), 0.01);
|
||||
assertEquals(shardBalance, allocator.getShardBalance(), 0.01);
|
||||
assertEquals(threshold, allocator.getThreshold(), 0.01);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Test constraint evaluation logic when with different values of ConstraintMode
|
||||
* for IndexShardPerNode constraint satisfied and breached.
|
||||
*/
|
||||
public void testIndexShardsPerNodeConstraint() {
|
||||
BalancedShardsAllocator.Balancer balancer = mock(BalancedShardsAllocator.Balancer.class);
|
||||
BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class);
|
||||
AllocationConstraints constraints = new AllocationConstraints();
|
||||
|
||||
int shardCount = randomIntBetween(1, 500);
|
||||
float avgShardsPerNode = 1.0f + (random().nextFloat()) * 999.0f;
|
||||
|
||||
when(balancer.avgShardsPerNode(anyString())).thenReturn(avgShardsPerNode);
|
||||
when(node.numShards(anyString())).thenReturn(shardCount);
|
||||
when(node.getNodeId()).thenReturn("test-node");
|
||||
|
||||
long expectedWeight = (shardCount >= avgShardsPerNode) ? constraints.CONSTRAINT_WEIGHT : 0;
|
||||
assertEquals(expectedWeight, constraints.weight(balancer, node, "index"));
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,202 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.cluster.routing.allocation;
|
||||
|
||||
import org.opensearch.cluster.ClusterInfo;
|
||||
import org.opensearch.cluster.ClusterInfoService;
|
||||
import org.opensearch.cluster.ClusterState;
|
||||
import org.opensearch.cluster.DiskUsage;
|
||||
import org.opensearch.cluster.OpenSearchAllocationWithConstraintsTestCase;
|
||||
import org.opensearch.cluster.node.DiscoveryNodes;
|
||||
import org.opensearch.cluster.routing.ShardRouting;
|
||||
import org.opensearch.common.collect.ImmutableOpenMap;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.index.shard.ShardId;
|
||||
import org.opensearch.test.VersionUtils;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
|
||||
import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED;
|
||||
|
||||
public class IndexShardConstraintDeciderOverlapTests extends OpenSearchAllocationWithConstraintsTestCase {
|
||||
|
||||
/**
|
||||
* High watermark breach blocks new shard allocations to affected nodes. If shard count on such
|
||||
* nodes is low, this will cause IndexShardPerNodeConstraint to breach.
|
||||
*
|
||||
* This test verifies that this doesn't lead to unassigned shards, and there are no hot spots in eligible
|
||||
* nodes.
|
||||
*/
|
||||
public void testHighWatermarkBreachWithLowShardCount() {
|
||||
setupInitialCluster(3, 15, 10, 1);
|
||||
addNodesWithIndexing(1, "high_watermark_node_", 6, 5, 1);
|
||||
|
||||
// Disk threshold settings enabled
|
||||
Settings settings = Settings.builder()
|
||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
|
||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7)
|
||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.8)
|
||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), 0.95)
|
||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 1)
|
||||
.put("cluster.routing.allocation.cluster_concurrent_recoveries", 1)
|
||||
.build();
|
||||
|
||||
// Build Shard size and disk usages
|
||||
ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
|
||||
usagesBuilder.put("node_0", new DiskUsage("node_0", "node_0", "/dev/null", 100, 80)); // 20% used
|
||||
usagesBuilder.put("node_1", new DiskUsage("node_1", "node_1", "/dev/null", 100, 55)); // 45% used
|
||||
usagesBuilder.put("node_2", new DiskUsage("node_2", "node_2", "/dev/null", 100, 35)); // 65% used
|
||||
usagesBuilder.put("high_watermark_node_0",
|
||||
new DiskUsage("high_watermark_node_0", "high_watermark_node_0", "/dev/null", 100, 10)); // 90% used
|
||||
|
||||
ImmutableOpenMap<String, DiskUsage> usages = usagesBuilder.build();
|
||||
ImmutableOpenMap.Builder<String, Long> shardSizesBuilder = ImmutableOpenMap.builder();
|
||||
clusterState.getRoutingTable().allShards().forEach(shard ->
|
||||
shardSizesBuilder.put(shardIdentifierFromRouting(shard), 1L)); // Each shard is 1 byte
|
||||
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
|
||||
|
||||
final ImmutableOpenMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace =
|
||||
new ImmutableOpenMap.Builder<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace>()
|
||||
.fPut(
|
||||
getNodeAndDevNullPath("node_0"), getReservedSpace()
|
||||
)
|
||||
.fPut(
|
||||
getNodeAndDevNullPath("node_1"), getReservedSpace()
|
||||
)
|
||||
.fPut(
|
||||
getNodeAndDevNullPath("node_2"), getReservedSpace()
|
||||
)
|
||||
.fPut(
|
||||
getNodeAndDevNullPath("high_watermark_node_0"), getReservedSpace()
|
||||
)
|
||||
.build();
|
||||
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes, reservedSpace);
|
||||
ClusterInfoService cis = () -> clusterInfo;
|
||||
allocation = createAllocationService(settings, cis);
|
||||
|
||||
allocateAndCheckIndexShardHotSpots(false, 3, "node_0", "node_1", "node_2");
|
||||
assertForIndexShardHotSpots(true, 4);
|
||||
assertTrue(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).isEmpty());
|
||||
assertTrue(clusterState.getRoutingNodes().node("high_watermark_node_0").isEmpty());
|
||||
|
||||
|
||||
/* Shard sizes that would breach high watermark on node_2 if allocated.
|
||||
*/
|
||||
addIndices("big_index_", 1, 10, 0);
|
||||
ImmutableOpenMap.Builder<String, Long> bigIndexShardSizeBuilder = ImmutableOpenMap.builder(shardSizes);
|
||||
clusterState.getRoutingNodes().unassigned()
|
||||
.forEach(shard -> bigIndexShardSizeBuilder.put(shardIdentifierFromRouting(shard), 20L));
|
||||
shardSizes = bigIndexShardSizeBuilder.build();
|
||||
final ClusterInfo bigIndexClusterInfo = new DevNullClusterInfo(usages, usages, shardSizes, reservedSpace);
|
||||
cis = () -> bigIndexClusterInfo;
|
||||
allocation = createAllocationService(settings, cis);
|
||||
|
||||
allocateAndCheckIndexShardHotSpots(false, 2, "node_0", "node_1");
|
||||
assertForIndexShardHotSpots(true, 4);
|
||||
assertTrue(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).isEmpty());
|
||||
for (ShardRouting shard: clusterState.getRoutingTable().index("big_index_0").shardsWithState(STARTED)) {
|
||||
assertNotEquals("node_2", shard.currentNodeId());
|
||||
}
|
||||
}
|
||||
|
||||
private ClusterInfo.NodeAndPath getNodeAndDevNullPath(String node) {
|
||||
return new ClusterInfo.NodeAndPath(node, "/dev/null");
|
||||
}
|
||||
|
||||
private ClusterInfo.ReservedSpace getReservedSpace() {
|
||||
return new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), 2).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test clusters with subset of nodes on older version.
|
||||
* New version shards should not migrate to old version nodes, even if this creates potential hot spots.
|
||||
*/
|
||||
public void testNodeVersionCompatibilityOverlap() {
|
||||
setupInitialCluster(3, 6, 10, 1);
|
||||
|
||||
// Add an old version node and exclude a new version node
|
||||
DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes())
|
||||
.add(newNode("old_node", VersionUtils.getPreviousVersion()));
|
||||
clusterState = ClusterState.builder(clusterState).nodes(nb.build()).build();
|
||||
buildAllocationService("node_0");
|
||||
|
||||
// Shards should only go to remaining new version nodes
|
||||
allocateAndCheckIndexShardHotSpots(false, 2, "node_1", "node_2");
|
||||
assertForIndexShardHotSpots(true, 4);
|
||||
assertTrue(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).isEmpty());
|
||||
|
||||
for (ShardRouting shard: clusterState.getRoutingTable().allShards()) {
|
||||
assertNotEquals("node_0", shard.currentNodeId());
|
||||
assertNotEquals("old_node", shard.currentNodeId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test zone aware clusters with balanced zones.
|
||||
* No hot spots expected.
|
||||
*/
|
||||
public void testZoneBalanced() {
|
||||
Map<String, Integer> nodesPerZone = new HashMap<>();
|
||||
nodesPerZone.put("zone_0", 3);
|
||||
nodesPerZone.put("zone_1", 3);
|
||||
createEmptyZoneAwareCluster(nodesPerZone);
|
||||
addIndices("index_", 4, 5, 1);
|
||||
|
||||
buildZoneAwareAllocationService();
|
||||
allocateAndCheckIndexShardHotSpots(false, 6);
|
||||
|
||||
resetCluster();
|
||||
buildZoneAwareAllocationService();
|
||||
allocateAndCheckIndexShardHotSpots(false, 6);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test zone aware clusters with unbalanced zones.
|
||||
* Hot spots expected as awareness forces shards per zone restrictions.
|
||||
*/
|
||||
public void testZoneUnbalanced() {
|
||||
Map<String, Integer> nodesPerZone = new HashMap<>();
|
||||
nodesPerZone.put("zone_0", 5);
|
||||
nodesPerZone.put("zone_1", 1);
|
||||
createEmptyZoneAwareCluster(nodesPerZone);
|
||||
addIndices("index_", 1, 5, 1);
|
||||
updateInitialCluster();
|
||||
|
||||
|
||||
buildZoneAwareAllocationService();
|
||||
clusterState = allocateShardsAndBalance(clusterState);
|
||||
assertForIndexShardHotSpots(true, 6);
|
||||
assertTrue(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).isEmpty());
|
||||
|
||||
resetCluster();
|
||||
buildZoneAwareAllocationService();
|
||||
clusterState = allocateShardsAndBalance(clusterState);
|
||||
assertForIndexShardHotSpots(true, 6);
|
||||
assertTrue(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).isEmpty());
|
||||
}
|
||||
|
||||
/**
|
||||
* ClusterInfo that always points to DevNull.
|
||||
*/
|
||||
public static class DevNullClusterInfo extends ClusterInfo {
|
||||
public DevNullClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
|
||||
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage,
|
||||
ImmutableOpenMap<String, Long> shardSizes,
|
||||
ImmutableOpenMap<NodeAndPath, ReservedSpace> reservedSpace) {
|
||||
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDataPath(ShardRouting shardRouting) {
|
||||
return "/dev/null";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.cluster.routing.allocation;
|
||||
|
||||
import org.opensearch.cluster.OpenSearchAllocationWithConstraintsTestCase;
|
||||
|
||||
public class IndexShardHotSpotTests extends OpenSearchAllocationWithConstraintsTestCase {
|
||||
|
||||
/**
|
||||
* Test single node replacement without active indexing.
|
||||
*/
|
||||
public void testNodeReplacement() {
|
||||
setupInitialCluster(5, 1, 5, 1);
|
||||
terminateNodes("node_1");
|
||||
assertForIndexShardHotSpots(false, 4);
|
||||
addNodesWithoutIndexing(1, "new_node_");
|
||||
int movesForModeNone = allocateAndCheckIndexShardHotSpots(false, 5, "new_node_0");
|
||||
|
||||
setupInitialCluster(5, 1, 5, 1);
|
||||
terminateNodes("node_1");
|
||||
assertForIndexShardHotSpots(false, 4);
|
||||
addNodesWithoutIndexing(1, "new_node_");
|
||||
int movesForModeUnassigned = allocateAndCheckIndexShardHotSpots(false, 5, "new_node_0");
|
||||
assertTrue(movesForModeUnassigned <= movesForModeNone);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test single node replacement with active indexing.
|
||||
*/
|
||||
public void testNodeReplacementWithIndexing() {
|
||||
setupInitialCluster(5, 30, 5, 1);
|
||||
buildAllocationService();
|
||||
terminateNodes("node_1");
|
||||
assertForIndexShardHotSpots(false, 4);
|
||||
addNodesWithIndexing(1, "new_node_", 3, 20, 1);
|
||||
int movesForModeNone = allocateAndCheckIndexShardHotSpots(false, 5, "new_node_0");
|
||||
|
||||
resetCluster();
|
||||
buildAllocationService();
|
||||
terminateNodes("node_1");
|
||||
assertForIndexShardHotSpots(false, 4);
|
||||
addNodesWithIndexing(1, "new_node_", 3, 20, 1);
|
||||
int movesForModeUnassigned = allocateAndCheckIndexShardHotSpots(false, 5, "new_node_0");
|
||||
assertTrue(movesForModeUnassigned <= movesForModeNone);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test skewed cluster scale out via single node addition during active indexing.
|
||||
*/
|
||||
public void testSkewedClusterScaleOut() {
|
||||
setupInitialCluster(3, 30, 10, 1);
|
||||
buildAllocationService();
|
||||
addNodesWithIndexing(1, "new_node_", 8, 10, 1);
|
||||
int movesForModeNone = allocateAndCheckIndexShardHotSpots( false, 4, "new_node_0");
|
||||
|
||||
resetCluster();
|
||||
buildAllocationService();
|
||||
addNodesWithIndexing(1, "new_node_", 8, 10, 1);
|
||||
int movesForModeUnassigned = allocateAndCheckIndexShardHotSpots(false, 4, "new_node_0");
|
||||
assertTrue(movesForModeUnassigned <= movesForModeNone);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test under replicated yellow cluster scale out to green.
|
||||
*
|
||||
* This scenario is not expected to create hotspots even without constraints enabled. The
|
||||
* test is a sanity check to ensure allocation constraints don't worsen the situation.
|
||||
*/
|
||||
public void testUnderReplicatedClusterScaleOut() {
|
||||
setupInitialCluster(3, 30, 10, 3);
|
||||
buildAllocationService();
|
||||
addNodesWithoutIndexing(1, "new_node_");
|
||||
int movesForModeNone = allocateAndCheckIndexShardHotSpots(false, 4, "new_node_0");
|
||||
|
||||
resetCluster();
|
||||
buildAllocationService();
|
||||
addNodesWithoutIndexing(1, "new_node_");
|
||||
int movesForModeUnassigned = allocateAndCheckIndexShardHotSpots(false, 4, "new_node_0");
|
||||
assertTrue(movesForModeUnassigned <= movesForModeNone);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test cluster scale in scenario, when nodes are gracefully excluded from
|
||||
* cluster before termination.
|
||||
*
|
||||
* During moveShards(), shards are picked from across indexes in an interleaved manner.
|
||||
* This prevents hot spots by evenly picking up shards. Since shard order can change
|
||||
* in subsequent runs, we are not guaranteed to less moves than no allocation constraint run.
|
||||
*
|
||||
* Move tests are hence just a sanity test, to ensure we don't create any unexpected hot spots with
|
||||
* allocation settings.
|
||||
*/
|
||||
public void testClusterScaleIn() {
|
||||
setupInitialCluster(4, 30, 10, 1);
|
||||
buildAllocationService("node_0,node_1");
|
||||
allocateAndCheckIndexShardHotSpots(false, 2, "node_2", "node_3");
|
||||
|
||||
resetCluster();
|
||||
buildAllocationService("node_0,node_1");
|
||||
allocateAndCheckIndexShardHotSpots(false, 2, "node_2", "node_3");
|
||||
}
|
||||
|
||||
/**
|
||||
* Test cluster scale in scenario with skewed shard distribution in remaining nodes.
|
||||
*/
|
||||
public void testClusterScaleInWithSkew() {
|
||||
setupInitialCluster(4, 100, 5, 1);
|
||||
buildAllocationService("node_0,node_1");
|
||||
addNodesWithoutIndexing(1, "new_node_");
|
||||
allocateAndCheckIndexShardHotSpots(false, 3, "node_2", "node_3", "new_node_0");
|
||||
|
||||
resetCluster();
|
||||
buildAllocationService("node_0,node_1");
|
||||
addNodesWithoutIndexing(1, "new_node_");
|
||||
allocateAndCheckIndexShardHotSpots(false, 3, "node_2", "node_3", "new_node_0");
|
||||
}
|
||||
}
|
|
@ -34,6 +34,7 @@ package org.opensearch.cluster.routing.allocation;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.opensearch.Version;
|
||||
import org.opensearch.cluster.ClusterState;
|
||||
import org.opensearch.cluster.OpenSearchAllocationTestCase;
|
||||
|
@ -48,7 +49,13 @@ import org.opensearch.common.settings.Settings;
|
|||
|
||||
import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING;
|
||||
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
|
||||
public class ShardsLimitAllocationTests extends OpenSearchAllocationTestCase {
|
||||
private final Logger logger = LogManager.getLogger(ShardsLimitAllocationTests.class);
|
||||
|
@ -201,13 +208,20 @@ public class ShardsLimitAllocationTests extends OpenSearchAllocationTestCase {
|
|||
|
||||
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
|
||||
|
||||
assertThat(RoutingNodesUtils.numberOfShardsOfType(clusterState.getRoutingNodes(), STARTED), equalTo(10));
|
||||
// AllocationConstraints will choose node2 for 3 test1 shards and the rest are assigned to node1
|
||||
// These shards then relocate to node2 after balanceByWeights() is invoked
|
||||
assertThat(RoutingNodesUtils.numberOfShardsOfType(clusterState.getRoutingNodes(), STARTED), equalTo(8));
|
||||
assertThat(RoutingNodesUtils.numberOfShardsOfType(clusterState.getRoutingNodes(), RELOCATING), equalTo(2));
|
||||
|
||||
// Expected values depend on which index shards were chosen for relocation.
|
||||
String relocatingIndex = getRelocatingIndex(clusterState);
|
||||
Matcher<String> node1Matcher = anyOf(equalTo("test"), equalTo("test1"));
|
||||
Matcher<String> node2Matcher = relocatingIndex.equals("test1") ? equalTo("test1") : node1Matcher;
|
||||
for (ShardRouting shardRouting : clusterState.getRoutingNodes().node("node1")) {
|
||||
assertThat(shardRouting.getIndexName(), equalTo("test"));
|
||||
assertThat(shardRouting.getIndexName(), node1Matcher);
|
||||
}
|
||||
for (ShardRouting shardRouting : clusterState.getRoutingNodes().node("node2")) {
|
||||
assertThat(shardRouting.getIndexName(), equalTo("test1"));
|
||||
assertThat(shardRouting.getIndexName(), node2Matcher);
|
||||
}
|
||||
|
||||
logger.info("update {} for test, see that things move",
|
||||
|
@ -226,10 +240,12 @@ public class ShardsLimitAllocationTests extends OpenSearchAllocationTestCase {
|
|||
logger.info("reroute after setting");
|
||||
clusterState = strategy.reroute(clusterState, "reroute");
|
||||
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(STARTED), equalTo(3));
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(RELOCATING), equalTo(2));
|
||||
assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(RELOCATING), equalTo(2));
|
||||
assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(3));
|
||||
int node1RelocCount = relocatingIndex.equals("test1") ? 4 : 2;
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(RELOCATING), equalTo(node1RelocCount));
|
||||
|
||||
int node2RelocCount = relocatingIndex.equals("test1") ? 2 : 0;
|
||||
assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(RELOCATING), equalTo(node2RelocCount));
|
||||
assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(3 - node2RelocCount));
|
||||
// the first move will destroy the balance and the balancer will move 2 shards from node2 to node one right after
|
||||
// moving the nodes to node2 since we consider INITIALIZING nodes during rebalance
|
||||
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
|
||||
|
@ -237,4 +253,13 @@ public class ShardsLimitAllocationTests extends OpenSearchAllocationTestCase {
|
|||
assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(STARTED), equalTo(5));
|
||||
assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(5));
|
||||
}
|
||||
|
||||
private String getRelocatingIndex(ClusterState clusterState) {
|
||||
final Set<String> indices = Stream.of(clusterState.getRoutingNodes())
|
||||
.flatMap(node -> node.shardsWithState(RELOCATING).stream().map(x -> x.getIndexName()))
|
||||
.collect(Collectors.toSet());
|
||||
assertThat(indices.size(), equalTo(1));
|
||||
return indices.iterator().next();
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,336 @@
|
|||
/*
|
||||
* Copyright OpenSearch Contributors.
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package org.opensearch.cluster;
|
||||
|
||||
import org.opensearch.Version;
|
||||
import org.opensearch.cluster.metadata.IndexMetadata;
|
||||
import org.opensearch.cluster.metadata.Metadata;
|
||||
import org.opensearch.cluster.node.DiscoveryNodes;
|
||||
import org.opensearch.cluster.routing.RoutingNode;
|
||||
import org.opensearch.cluster.routing.RoutingTable;
|
||||
import org.opensearch.cluster.routing.ShardRouting;
|
||||
import org.opensearch.cluster.routing.UnassignedInfo;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.common.util.CollectionUtils;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
|
||||
|
||||
public abstract class OpenSearchAllocationWithConstraintsTestCase extends OpenSearchAllocationTestCase {
|
||||
|
||||
protected MockAllocationService allocation;
|
||||
private ClusterState initialClusterState;
|
||||
protected ClusterState clusterState;
|
||||
private HashMap<String, Integer> indexShardCount;
|
||||
private HashMap<String, HashMap<String, Integer>> nodeShardsByIndex;
|
||||
private static final int MAX_REROUTE_STEPS_ALLOWED = 1500;
|
||||
|
||||
@Before
|
||||
public void clearState() {
|
||||
allocation = null;
|
||||
initialClusterState = null;
|
||||
clusterState = null;
|
||||
indexShardCount = null;
|
||||
nodeShardsByIndex = null;
|
||||
}
|
||||
|
||||
public static String shardIdentifierFromRouting(ShardRouting shardRouting) {
|
||||
return shardRouting.shardId().toString() + "[" + (shardRouting.primary() ? "p" : "r") + "]";
|
||||
}
|
||||
|
||||
public void buildAllocationService() {
|
||||
Settings.Builder sb = Settings.builder();
|
||||
buildAllocationService(sb);
|
||||
}
|
||||
|
||||
public void buildAllocationService(String excludeNodes) {
|
||||
logger.info("Excluding nodes: [{}]", excludeNodes);
|
||||
Settings.Builder sb = Settings.builder().put("cluster.routing.allocation.exclude._node_id", excludeNodes);
|
||||
|
||||
buildAllocationService(sb);
|
||||
}
|
||||
|
||||
public void buildZoneAwareAllocationService() {
|
||||
logger.info("Creating zone aware cluster");
|
||||
Settings.Builder sb = Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone");
|
||||
|
||||
buildAllocationService(sb);
|
||||
}
|
||||
|
||||
public void buildAllocationService(Settings.Builder sb) {
|
||||
sb.put("cluster.routing.allocation.node_concurrent_recoveries", 1);
|
||||
sb.put("cluster.routing.allocation.cluster_concurrent_rebalance", 1);
|
||||
allocation = createAllocationService(sb.build());
|
||||
}
|
||||
|
||||
public Metadata buildMetadata(Metadata.Builder mb, String indexPrefix, int numberOfIndices, int numberOfShards,
|
||||
int numberOfReplicas) {
|
||||
for (int i = 0; i < numberOfIndices; i++) {
|
||||
mb.put(IndexMetadata.builder(indexPrefix + i)
|
||||
.settings(settings(Version.CURRENT)
|
||||
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0"))
|
||||
.numberOfShards(numberOfShards).numberOfReplicas(numberOfReplicas));
|
||||
}
|
||||
return mb.build();
|
||||
}
|
||||
|
||||
public RoutingTable buildRoutingTable(RoutingTable.Builder rb, Metadata metadata, String indexPrefix,
|
||||
int numberOfIndices) {
|
||||
for (int i = 0; i < numberOfIndices; i++) {
|
||||
rb.addAsNew(metadata.index(indexPrefix + i));
|
||||
}
|
||||
return rb.build();
|
||||
}
|
||||
|
||||
public DiscoveryNodes addNodes(DiscoveryNodes.Builder nb, String nodePrefix, int nodesAdded) {
|
||||
for (int i = 0; i < nodesAdded; i++) {
|
||||
nb.add(newNode(nodePrefix + i, singletonMap("_node_id", nodePrefix + i)));
|
||||
}
|
||||
return nb.build();
|
||||
}
|
||||
|
||||
public void resetCluster() {
|
||||
clusterState = initialClusterState;
|
||||
}
|
||||
|
||||
public void updateInitialCluster() {
|
||||
initialClusterState = clusterState;
|
||||
}
|
||||
|
||||
public void createEmptyZoneAwareCluster(Map<String, Integer> nodesPerZone) {
|
||||
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
|
||||
int nodeId = 0;
|
||||
for (String zone : nodesPerZone.keySet()) {
|
||||
for (int i = 0; i < nodesPerZone.get(zone); i++) {
|
||||
nb.add(newNode("node_" + nodeId, singletonMap("zone", zone)));
|
||||
nodeId++;
|
||||
}
|
||||
}
|
||||
initialClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(nb.build()).build();
|
||||
clusterState = initialClusterState;
|
||||
}
|
||||
|
||||
public void setupInitialCluster(int nodeCount, int indices, int shards, int replicas) {
|
||||
final String DEFAULT_INDEX_PREFIX = "index_";
|
||||
final String DEFAULT_NODE_PREFIX = "node_";
|
||||
|
||||
Metadata metadata = buildMetadata(Metadata.builder(), DEFAULT_INDEX_PREFIX, indices, shards, replicas);
|
||||
RoutingTable routingTable = buildRoutingTable(RoutingTable.builder(), metadata, DEFAULT_INDEX_PREFIX, indices);
|
||||
DiscoveryNodes nodes = addNodes(DiscoveryNodes.builder(), DEFAULT_NODE_PREFIX, nodeCount);
|
||||
initialClusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(routingTable)
|
||||
.nodes(nodes).build();
|
||||
|
||||
buildAllocationService();
|
||||
initialClusterState = allocateShardsAndBalance(initialClusterState);
|
||||
clusterState = initialClusterState;
|
||||
|
||||
indexShardCount = new HashMap<>();
|
||||
for (int i = 0; i < indices; i++) {
|
||||
indexShardCount.put(DEFAULT_INDEX_PREFIX + i, shards * (replicas + 1));
|
||||
}
|
||||
|
||||
assertForIndexShardHotSpots(false, nodeCount); // Initial cluster should be balanced
|
||||
logger.info("Initial cluster created...");
|
||||
}
|
||||
|
||||
public void buildNodeShardsByIndex() {
|
||||
nodeShardsByIndex = new HashMap<>();
|
||||
for (RoutingNode rn : clusterState.getRoutingNodes()) {
|
||||
String node = rn.nodeId();
|
||||
nodeShardsByIndex.put(node, new HashMap<>());
|
||||
for (ShardRouting shard : rn) {
|
||||
assert shard.currentNodeId().equals(node);
|
||||
if (shard.state() == INITIALIZING || shard.state() == STARTED) {
|
||||
nodeShardsByIndex.get(node).merge(shard.getIndexName(), 1, Integer::sum);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isIndexHotSpotPresent(int nodes, List<String> nodeList) {
|
||||
for (String node : nodeList) {
|
||||
for (String index : nodeShardsByIndex.get(node).keySet()) {
|
||||
int count = nodeShardsByIndex.get(node).get(index);
|
||||
int limit = (int) Math.ceil(indexShardCount.get(index) / (float) nodes);
|
||||
if (count > limit) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public List<String> getNodeList(String... nodesToValidate) {
|
||||
if (CollectionUtils.isEmpty(nodesToValidate)) {
|
||||
return Arrays.asList(clusterState.getNodes().resolveNodes());
|
||||
}
|
||||
return Arrays.asList(nodesToValidate);
|
||||
}
|
||||
|
||||
public void assertForIndexShardHotSpots(boolean expected, int nodes, String... nodesToValidate) {
|
||||
List<String> nodeList = getNodeList(nodesToValidate);
|
||||
buildNodeShardsByIndex();
|
||||
assertEquals(expected, isIndexHotSpotPresent(nodes, nodeList));
|
||||
}
|
||||
|
||||
public int allocateAndCheckIndexShardHotSpots(boolean expected, int nodes, String... nodesToValidate) {
|
||||
List<String> nodeList = getNodeList(nodesToValidate);
|
||||
boolean hasHotSpot = false;
|
||||
buildNodeShardsByIndex();
|
||||
List<ShardRouting> initShards;
|
||||
int movesToBalance = 0;
|
||||
do {
|
||||
assert movesToBalance <= MAX_REROUTE_STEPS_ALLOWED : "Could not balance cluster in max allowed moves";
|
||||
clusterState = allocation.applyStartedShards(clusterState,
|
||||
clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
|
||||
clusterState = allocation.reroute(clusterState, "reroute");
|
||||
|
||||
initShards = clusterState.getRoutingNodes().shardsWithState(INITIALIZING);
|
||||
for (ShardRouting shard : initShards) {
|
||||
String node = shard.currentNodeId();
|
||||
String index = shard.getIndexName();
|
||||
nodeShardsByIndex.get(node).merge(index, 1, Integer::sum);
|
||||
|
||||
if (!nodeList.contains(node)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int count = nodeShardsByIndex.get(node).get(index);
|
||||
int limit = (int) Math.ceil(indexShardCount.get(index) / (float) nodes);
|
||||
if (count <= limit) {
|
||||
continue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Hot spots can occur due to the order in which shards get allocated to nodes.
|
||||
* A node with fewer shards may not be able to accept current shard due to
|
||||
* SameShardAllocationDecider, causing it to breach allocation constraint on
|
||||
* another node. We need to differentiate between such hot spots v/s actual hot
|
||||
* spots.
|
||||
*
|
||||
* A simple check could be to ensure there is no node with shards less than
|
||||
* allocation limit, that can accept current shard. However, in current
|
||||
* allocation algorithm, when nodes get throttled, shards are added to
|
||||
* ModelNodes without adding them to actual cluster (RoutingNodes). As a result,
|
||||
* the shards per node we see here, are different from the ones observed by
|
||||
* weight function in balancer. RoutingNodes with {@link count} < {@link limit}
|
||||
* may not have had the same count in the corresponding ModelNode seen by weight
|
||||
* function. We hence use the following alternate check --
|
||||
*
|
||||
* Given the way {@link limit} is defined, we should not have hot spots if *all*
|
||||
* nodes are eligible to accept the shard. A hot spot is acceptable, if either
|
||||
* all peer nodes have {@link count} > {@link limit}, or if even one node is
|
||||
* ineligible to accept the shard due to SameShardAllocationDecider, as this
|
||||
* leads to a chain of events that breach IndexShardsPerNode constraint on all
|
||||
* other nodes.
|
||||
*/
|
||||
|
||||
// If all peer nodes have count >= limit, hotspot is acceptable
|
||||
boolean limitBreachedOnAllNodes = true;
|
||||
for (RoutingNode peerNode : clusterState.getRoutingNodes()) {
|
||||
if (peerNode.nodeId().equals(node)) {
|
||||
continue;
|
||||
}
|
||||
int peerCount = nodeShardsByIndex.get(peerNode.nodeId()).getOrDefault(index, 0);
|
||||
if (peerCount < limit) {
|
||||
limitBreachedOnAllNodes = false;
|
||||
}
|
||||
}
|
||||
if (limitBreachedOnAllNodes) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If any node is ineligible to accept the shard, this hot spot is acceptable
|
||||
boolean peerHasSameShard = false;
|
||||
for (RoutingNode peerNode : clusterState.getRoutingNodes()) {
|
||||
if (peerNode.nodeId().equals(node)) {
|
||||
continue;
|
||||
}
|
||||
ShardRouting sameIdShardOnPeer = peerNode.getByShardId(shard.shardId());
|
||||
if (sameIdShardOnPeer != null && sameIdShardOnPeer.getIndexName().equals(index)) {
|
||||
peerHasSameShard = true;
|
||||
}
|
||||
}
|
||||
if (peerHasSameShard) {
|
||||
continue;
|
||||
}
|
||||
|
||||
hasHotSpot = true;
|
||||
}
|
||||
movesToBalance++;
|
||||
} while (!initShards.isEmpty());
|
||||
|
||||
logger.info("HotSpot: [{}], Moves to balance: [{}]", hasHotSpot, movesToBalance);
|
||||
assertEquals(expected, hasHotSpot);
|
||||
assertForIndexShardHotSpots(false, nodes); // Post re-balancing, cluster should always be hot spot free.
|
||||
return movesToBalance;
|
||||
}
|
||||
|
||||
public ClusterState allocateShardsAndBalance(ClusterState clusterState) {
|
||||
int iterations = 0;
|
||||
do {
|
||||
clusterState = allocation.applyStartedShards(clusterState,
|
||||
clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
|
||||
clusterState = allocation.reroute(clusterState, "reroute");
|
||||
iterations++;
|
||||
} while (!clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty()
|
||||
&& iterations < MAX_REROUTE_STEPS_ALLOWED);
|
||||
return clusterState;
|
||||
}
|
||||
|
||||
public void addNodesWithoutIndexing(int nodeCount, String node_prefix) {
|
||||
DiscoveryNodes nodes = addNodes(DiscoveryNodes.builder(clusterState.nodes()), node_prefix, nodeCount);
|
||||
clusterState = ClusterState.builder(clusterState).nodes(nodes).build();
|
||||
}
|
||||
|
||||
public void terminateNodes(String... nodesToTerminate) {
|
||||
if (CollectionUtils.isEmpty(nodesToTerminate)) {
|
||||
return;
|
||||
}
|
||||
DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes());
|
||||
Arrays.asList(nodesToTerminate).forEach(node -> nb.remove(node));
|
||||
clusterState = ClusterState.builder(clusterState).nodes(nb.build()).build();
|
||||
clusterState = allocation.disassociateDeadNodes(clusterState, true, "node-terminated");
|
||||
clusterState = allocateShardsAndBalance(clusterState);
|
||||
}
|
||||
|
||||
public void addNodesWithIndexing(int nodeCount, String node_prefix, int indices, int shards, int replicas) {
|
||||
final String NEW_INDEX_PREFIX = "new_index_";
|
||||
Metadata md = buildMetadata(Metadata.builder(clusterState.getMetadata()), NEW_INDEX_PREFIX, indices, shards,
|
||||
replicas);
|
||||
RoutingTable rb = buildRoutingTable(RoutingTable.builder(clusterState.getRoutingTable()), md, NEW_INDEX_PREFIX,
|
||||
indices);
|
||||
DiscoveryNodes nodes = addNodes(DiscoveryNodes.builder(clusterState.nodes()), node_prefix, nodeCount);
|
||||
clusterState = ClusterState.builder(clusterState).metadata(md).routingTable(rb).nodes(nodes).build();
|
||||
for (int i = 0; i < indices; i++) {
|
||||
indexShardCount.put(NEW_INDEX_PREFIX + i, shards * (replicas + 1));
|
||||
}
|
||||
}
|
||||
|
||||
public void addIndices(String index_prefix, int indices, int shards, int replicas) {
|
||||
Metadata md = buildMetadata(Metadata.builder(clusterState.getMetadata()), index_prefix, indices, shards,
|
||||
replicas);
|
||||
RoutingTable rb = buildRoutingTable(RoutingTable.builder(clusterState.getRoutingTable()), md, index_prefix,
|
||||
indices);
|
||||
clusterState = ClusterState.builder(clusterState).metadata(md).routingTable(rb).build();
|
||||
|
||||
if (indexShardCount == null) {
|
||||
indexShardCount = new HashMap<>();
|
||||
}
|
||||
|
||||
for (int i = 0; i < indices; i++) {
|
||||
indexShardCount.put(index_prefix + i, shards * (replicas + 1));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue