diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java new file mode 100644 index 00000000000..fd255166718 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors. + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; + +/** + * Allocation constraints specify conditions which, if breached, reduce the + * priority of a node for receiving shard allocations. + */ +public class AllocationConstraints { + public final long CONSTRAINT_WEIGHT = 1000000L; + private List> constraintPredicates; + + public AllocationConstraints() { + this.constraintPredicates = new ArrayList<>(1); + this.constraintPredicates.add(isIndexShardsPerNodeBreached()); + } + + class ConstraintParams { + private BalancedShardsAllocator.Balancer balancer; + private BalancedShardsAllocator.ModelNode node; + private String index; + + ConstraintParams(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, + String index) { + this.balancer = balancer; + this.node = node; + this.index = index; + } + } + + /** + * Evaluates configured allocation constraint predicates for given node - index + * combination; and returns a weight value based on the number of breached + * constraints. + * + * Constraint weight should be added to the weight calculated via weight + * function, to reduce priority of allocating on nodes with breached + * constraints. + * + * This weight function is used only in case of unassigned shards to avoid overloading a newly added node. + * Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this function. + */ + public long weight(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, + String index) { + int constraintsBreached = 0; + ConstraintParams params = new ConstraintParams(balancer, node, index); + for (Predicate predicate : constraintPredicates) { + if (predicate.test(params)) { + constraintsBreached++; + } + } + return constraintsBreached * CONSTRAINT_WEIGHT; + } + + /** + * Constraint to control number of shards of an index allocated on a single + * node. + * + * In current weight function implementation, when a node has significantly + * fewer shards than other nodes (e.g. during single new node addition or node + * replacement), its weight is much less than other nodes. All shard allocations + * at this time tend to land on the new node with skewed weight. This breaks + * index level balance in the cluster, by creating all shards of the same index + * on one node, often resulting in a hotspot on that node. + * + * This constraint is breached when balancer attempts to allocate more than + * average shards per index per node. + */ + private Predicate isIndexShardsPerNodeBreached() { + return (params) -> { + int currIndexShardsOnNode = params.node.numShards(params.index); + int allowedIndexShardsPerNode = (int) Math.ceil(params.balancer.avgShardsPerNode(params.index)); + return (currIndexShardsOnNode >= allowedIndexShardsPerNode); + }; + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index a7b4baa302a..705dd2240d6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -45,6 +45,7 @@ import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.AllocationConstraints; import org.opensearch.cluster.routing.allocation.AllocationDecision; import org.opensearch.cluster.routing.allocation.MoveDecision; import org.opensearch.cluster.routing.allocation.NodeAllocationResult; @@ -192,7 +193,6 @@ public class BalancedShardsAllocator implements ShardsAllocator { return weightFunction.shardBalance; } - /** * This class is the primary weight function used to create balanced over nodes and shards in the cluster. * Currently this function has 3 properties: @@ -216,13 +216,16 @@ public class BalancedShardsAllocator implements ShardsAllocator { * * * weight(node, index) = weightindex(node, index) + weightnode(node, index) + * + * package-private for testing */ - private static class WeightFunction { + static class WeightFunction { private final float indexBalance; private final float shardBalance; private final float theta0; private final float theta1; + private AllocationConstraints constraints; WeightFunction(float indexBalance, float shardBalance) { float sum = indexBalance + shardBalance; @@ -233,6 +236,12 @@ public class BalancedShardsAllocator implements ShardsAllocator { theta1 = indexBalance / sum; this.indexBalance = indexBalance; this.shardBalance = shardBalance; + this.constraints = new AllocationConstraints(); + } + + public float weightWithAllocationConstraints(Balancer balancer, ModelNode node, String index) { + float balancerWeight = weight(balancer, node, index); + return balancerWeight + constraints.weight(balancer, node, index); } float weight(Balancer balancer, ModelNode node, String index) { @@ -411,7 +420,10 @@ public class BalancedShardsAllocator implements ShardsAllocator { boolean deltaAboveThreshold = lessThan(currentDelta, threshold) == false; // calculate the delta of the weights of the two nodes if we were to add the shard to the // node in question and move it away from the node that currently holds it. - boolean betterWeightWithShardAdded = nodeWeight + 1.0f < currentWeight; + // hence we add 2.0f to the weight delta + float proposedDelta = 2.0f + nodeWeight - currentWeight; + boolean betterWeightWithShardAdded = proposedDelta < currentDelta; + rebalanceConditionsMet = deltaAboveThreshold && betterWeightWithShardAdded; // if the simulated weight delta with the shard moved away is better than the weight delta // with the shard remaining on the current node, and we are allowed to allocate to the @@ -964,7 +976,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { } // weight of this index currently on the node - float currentWeight = weight.weight(this, node, shard.getIndexName()); + float currentWeight = weight.weightWithAllocationConstraints(this, node, shard.getIndexName()); // moving the shard would not improve the balance, and we are not in explain mode, so short circuit if (currentWeight > minWeight && explain == false) { continue; @@ -1002,7 +1014,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { updateMinNode = currentDecision.type() == Type.YES; } } else { - updateMinNode = true; + updateMinNode = currentWeight < minWeight; } if (updateMinNode) { minNode = node; @@ -1086,7 +1098,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { } - static class ModelNode implements Iterable { + public static class ModelNode implements Iterable { private final Map indices = new HashMap<>(); private int numShards = 0; private final RoutingNode routingNode; diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java new file mode 100644 index 00000000000..c51bbab922b --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AllocationConstraintsTests extends OpenSearchAllocationTestCase { + + public void testSettings() { + Settings.Builder settings = Settings.builder(); + ClusterSettings service = new ClusterSettings(Settings.builder().build(), + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + BalancedShardsAllocator allocator = new BalancedShardsAllocator(settings.build(), service); + + settings = Settings.builder(); + float indexBalanceFactor = randomFloat(); + float shardBalance = randomFloat(); + float threshold = randomFloat(); + settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalanceFactor); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); + settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), threshold); + + service.applySettings(settings.build()); + + assertEquals(indexBalanceFactor, allocator.getIndexBalance(), 0.01); + assertEquals(shardBalance, allocator.getShardBalance(), 0.01); + assertEquals(threshold, allocator.getThreshold(), 0.01); + + } + + /** + * Test constraint evaluation logic when with different values of ConstraintMode + * for IndexShardPerNode constraint satisfied and breached. + */ + public void testIndexShardsPerNodeConstraint() { + BalancedShardsAllocator.Balancer balancer = mock(BalancedShardsAllocator.Balancer.class); + BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); + AllocationConstraints constraints = new AllocationConstraints(); + + int shardCount = randomIntBetween(1, 500); + float avgShardsPerNode = 1.0f + (random().nextFloat()) * 999.0f; + + when(balancer.avgShardsPerNode(anyString())).thenReturn(avgShardsPerNode); + when(node.numShards(anyString())).thenReturn(shardCount); + when(node.getNodeId()).thenReturn("test-node"); + + long expectedWeight = (shardCount >= avgShardsPerNode) ? constraints.CONSTRAINT_WEIGHT : 0; + assertEquals(expectedWeight, constraints.weight(balancer, node, "index")); + + } + +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java new file mode 100644 index 00000000000..218eb0c0b06 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java @@ -0,0 +1,202 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiskUsage; +import org.opensearch.cluster.OpenSearchAllocationWithConstraintsTestCase; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.collect.ImmutableOpenMap; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.test.VersionUtils; + +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; + +public class IndexShardConstraintDeciderOverlapTests extends OpenSearchAllocationWithConstraintsTestCase { + + /** + * High watermark breach blocks new shard allocations to affected nodes. If shard count on such + * nodes is low, this will cause IndexShardPerNodeConstraint to breach. + * + * This test verifies that this doesn't lead to unassigned shards, and there are no hot spots in eligible + * nodes. + */ + public void testHighWatermarkBreachWithLowShardCount() { + setupInitialCluster(3, 15, 10, 1); + addNodesWithIndexing(1, "high_watermark_node_", 6, 5, 1); + + // Disk threshold settings enabled + Settings settings = Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.8) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), 0.95) + .put("cluster.routing.allocation.node_concurrent_recoveries", 1) + .put("cluster.routing.allocation.cluster_concurrent_recoveries", 1) + .build(); + + // Build Shard size and disk usages + ImmutableOpenMap.Builder usagesBuilder = ImmutableOpenMap.builder(); + usagesBuilder.put("node_0", new DiskUsage("node_0", "node_0", "/dev/null", 100, 80)); // 20% used + usagesBuilder.put("node_1", new DiskUsage("node_1", "node_1", "/dev/null", 100, 55)); // 45% used + usagesBuilder.put("node_2", new DiskUsage("node_2", "node_2", "/dev/null", 100, 35)); // 65% used + usagesBuilder.put("high_watermark_node_0", + new DiskUsage("high_watermark_node_0", "high_watermark_node_0", "/dev/null", 100, 10)); // 90% used + + ImmutableOpenMap usages = usagesBuilder.build(); + ImmutableOpenMap.Builder shardSizesBuilder = ImmutableOpenMap.builder(); + clusterState.getRoutingTable().allShards().forEach(shard -> + shardSizesBuilder.put(shardIdentifierFromRouting(shard), 1L)); // Each shard is 1 byte + ImmutableOpenMap shardSizes = shardSizesBuilder.build(); + + final ImmutableOpenMap reservedSpace = + new ImmutableOpenMap.Builder() + .fPut( + getNodeAndDevNullPath("node_0"), getReservedSpace() + ) + .fPut( + getNodeAndDevNullPath("node_1"), getReservedSpace() + ) + .fPut( + getNodeAndDevNullPath("node_2"), getReservedSpace() + ) + .fPut( + getNodeAndDevNullPath("high_watermark_node_0"), getReservedSpace() + ) + .build(); + final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes, reservedSpace); + ClusterInfoService cis = () -> clusterInfo; + allocation = createAllocationService(settings, cis); + + allocateAndCheckIndexShardHotSpots(false, 3, "node_0", "node_1", "node_2"); + assertForIndexShardHotSpots(true, 4); + assertTrue(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).isEmpty()); + assertTrue(clusterState.getRoutingNodes().node("high_watermark_node_0").isEmpty()); + + + /* Shard sizes that would breach high watermark on node_2 if allocated. + */ + addIndices("big_index_", 1, 10, 0); + ImmutableOpenMap.Builder bigIndexShardSizeBuilder = ImmutableOpenMap.builder(shardSizes); + clusterState.getRoutingNodes().unassigned() + .forEach(shard -> bigIndexShardSizeBuilder.put(shardIdentifierFromRouting(shard), 20L)); + shardSizes = bigIndexShardSizeBuilder.build(); + final ClusterInfo bigIndexClusterInfo = new DevNullClusterInfo(usages, usages, shardSizes, reservedSpace); + cis = () -> bigIndexClusterInfo; + allocation = createAllocationService(settings, cis); + + allocateAndCheckIndexShardHotSpots(false, 2, "node_0", "node_1"); + assertForIndexShardHotSpots(true, 4); + assertTrue(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).isEmpty()); + for (ShardRouting shard: clusterState.getRoutingTable().index("big_index_0").shardsWithState(STARTED)) { + assertNotEquals("node_2", shard.currentNodeId()); + } + } + + private ClusterInfo.NodeAndPath getNodeAndDevNullPath(String node) { + return new ClusterInfo.NodeAndPath(node, "/dev/null"); + } + + private ClusterInfo.ReservedSpace getReservedSpace() { + return new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), 2).build(); + } + + /** + * Test clusters with subset of nodes on older version. + * New version shards should not migrate to old version nodes, even if this creates potential hot spots. + */ + public void testNodeVersionCompatibilityOverlap() { + setupInitialCluster(3, 6, 10, 1); + + // Add an old version node and exclude a new version node + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()) + .add(newNode("old_node", VersionUtils.getPreviousVersion())); + clusterState = ClusterState.builder(clusterState).nodes(nb.build()).build(); + buildAllocationService("node_0"); + + // Shards should only go to remaining new version nodes + allocateAndCheckIndexShardHotSpots(false, 2, "node_1", "node_2"); + assertForIndexShardHotSpots(true, 4); + assertTrue(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).isEmpty()); + + for (ShardRouting shard: clusterState.getRoutingTable().allShards()) { + assertNotEquals("node_0", shard.currentNodeId()); + assertNotEquals("old_node", shard.currentNodeId()); + } + } + + /** + * Test zone aware clusters with balanced zones. + * No hot spots expected. + */ + public void testZoneBalanced() { + Map nodesPerZone = new HashMap<>(); + nodesPerZone.put("zone_0", 3); + nodesPerZone.put("zone_1", 3); + createEmptyZoneAwareCluster(nodesPerZone); + addIndices("index_", 4, 5, 1); + + buildZoneAwareAllocationService(); + allocateAndCheckIndexShardHotSpots(false, 6); + + resetCluster(); + buildZoneAwareAllocationService(); + allocateAndCheckIndexShardHotSpots(false, 6); + } + + /** + * Test zone aware clusters with unbalanced zones. + * Hot spots expected as awareness forces shards per zone restrictions. + */ + public void testZoneUnbalanced() { + Map nodesPerZone = new HashMap<>(); + nodesPerZone.put("zone_0", 5); + nodesPerZone.put("zone_1", 1); + createEmptyZoneAwareCluster(nodesPerZone); + addIndices("index_", 1, 5, 1); + updateInitialCluster(); + + + buildZoneAwareAllocationService(); + clusterState = allocateShardsAndBalance(clusterState); + assertForIndexShardHotSpots(true, 6); + assertTrue(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).isEmpty()); + + resetCluster(); + buildZoneAwareAllocationService(); + clusterState = allocateShardsAndBalance(clusterState); + assertForIndexShardHotSpots(true, 6); + assertTrue(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).isEmpty()); + } + + /** + * ClusterInfo that always points to DevNull. + */ + public static class DevNullClusterInfo extends ClusterInfo { + public DevNullClusterInfo(ImmutableOpenMap leastAvailableSpaceUsage, + ImmutableOpenMap mostAvailableSpaceUsage, + ImmutableOpenMap shardSizes, + ImmutableOpenMap reservedSpace) { + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace); + } + + @Override + public String getDataPath(ShardRouting shardRouting) { + return "/dev/null"; + } + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardHotSpotTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardHotSpotTests.java new file mode 100644 index 00000000000..eeb8c0faab2 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardHotSpotTests.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.cluster.OpenSearchAllocationWithConstraintsTestCase; + +public class IndexShardHotSpotTests extends OpenSearchAllocationWithConstraintsTestCase { + + /** + * Test single node replacement without active indexing. + */ + public void testNodeReplacement() { + setupInitialCluster(5, 1, 5, 1); + terminateNodes("node_1"); + assertForIndexShardHotSpots(false, 4); + addNodesWithoutIndexing(1, "new_node_"); + int movesForModeNone = allocateAndCheckIndexShardHotSpots(false, 5, "new_node_0"); + + setupInitialCluster(5, 1, 5, 1); + terminateNodes("node_1"); + assertForIndexShardHotSpots(false, 4); + addNodesWithoutIndexing(1, "new_node_"); + int movesForModeUnassigned = allocateAndCheckIndexShardHotSpots(false, 5, "new_node_0"); + assertTrue(movesForModeUnassigned <= movesForModeNone); + } + + /** + * Test single node replacement with active indexing. + */ + public void testNodeReplacementWithIndexing() { + setupInitialCluster(5, 30, 5, 1); + buildAllocationService(); + terminateNodes("node_1"); + assertForIndexShardHotSpots(false, 4); + addNodesWithIndexing(1, "new_node_", 3, 20, 1); + int movesForModeNone = allocateAndCheckIndexShardHotSpots(false, 5, "new_node_0"); + + resetCluster(); + buildAllocationService(); + terminateNodes("node_1"); + assertForIndexShardHotSpots(false, 4); + addNodesWithIndexing(1, "new_node_", 3, 20, 1); + int movesForModeUnassigned = allocateAndCheckIndexShardHotSpots(false, 5, "new_node_0"); + assertTrue(movesForModeUnassigned <= movesForModeNone); + } + + /** + * Test skewed cluster scale out via single node addition during active indexing. + */ + public void testSkewedClusterScaleOut() { + setupInitialCluster(3, 30, 10, 1); + buildAllocationService(); + addNodesWithIndexing(1, "new_node_", 8, 10, 1); + int movesForModeNone = allocateAndCheckIndexShardHotSpots( false, 4, "new_node_0"); + + resetCluster(); + buildAllocationService(); + addNodesWithIndexing(1, "new_node_", 8, 10, 1); + int movesForModeUnassigned = allocateAndCheckIndexShardHotSpots(false, 4, "new_node_0"); + assertTrue(movesForModeUnassigned <= movesForModeNone); + } + + /** + * Test under replicated yellow cluster scale out to green. + * + * This scenario is not expected to create hotspots even without constraints enabled. The + * test is a sanity check to ensure allocation constraints don't worsen the situation. + */ + public void testUnderReplicatedClusterScaleOut() { + setupInitialCluster(3, 30, 10, 3); + buildAllocationService(); + addNodesWithoutIndexing(1, "new_node_"); + int movesForModeNone = allocateAndCheckIndexShardHotSpots(false, 4, "new_node_0"); + + resetCluster(); + buildAllocationService(); + addNodesWithoutIndexing(1, "new_node_"); + int movesForModeUnassigned = allocateAndCheckIndexShardHotSpots(false, 4, "new_node_0"); + assertTrue(movesForModeUnassigned <= movesForModeNone); + } + + /** + * Test cluster scale in scenario, when nodes are gracefully excluded from + * cluster before termination. + * + * During moveShards(), shards are picked from across indexes in an interleaved manner. + * This prevents hot spots by evenly picking up shards. Since shard order can change + * in subsequent runs, we are not guaranteed to less moves than no allocation constraint run. + * + * Move tests are hence just a sanity test, to ensure we don't create any unexpected hot spots with + * allocation settings. + */ + public void testClusterScaleIn() { + setupInitialCluster(4, 30, 10, 1); + buildAllocationService("node_0,node_1"); + allocateAndCheckIndexShardHotSpots(false, 2, "node_2", "node_3"); + + resetCluster(); + buildAllocationService("node_0,node_1"); + allocateAndCheckIndexShardHotSpots(false, 2, "node_2", "node_3"); + } + + /** + * Test cluster scale in scenario with skewed shard distribution in remaining nodes. + */ + public void testClusterScaleInWithSkew() { + setupInitialCluster(4, 100, 5, 1); + buildAllocationService("node_0,node_1"); + addNodesWithoutIndexing(1, "new_node_"); + allocateAndCheckIndexShardHotSpots(false, 3, "node_2", "node_3", "new_node_0"); + + resetCluster(); + buildAllocationService("node_0,node_1"); + addNodesWithoutIndexing(1, "new_node_"); + allocateAndCheckIndexShardHotSpots(false, 3, "node_2", "node_3", "new_node_0"); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsLimitAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsLimitAllocationTests.java index 7e8e957da86..9c9ee0449be 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsLimitAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsLimitAllocationTests.java @@ -34,6 +34,7 @@ package org.opensearch.cluster.routing.allocation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.hamcrest.Matcher; import org.opensearch.Version; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.OpenSearchAllocationTestCase; @@ -48,7 +49,13 @@ import org.opensearch.common.settings.Settings; import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.anyOf; public class ShardsLimitAllocationTests extends OpenSearchAllocationTestCase { private final Logger logger = LogManager.getLogger(ShardsLimitAllocationTests.class); @@ -201,13 +208,20 @@ public class ShardsLimitAllocationTests extends OpenSearchAllocationTestCase { clusterState = startInitializingShardsAndReroute(strategy, clusterState); - assertThat(RoutingNodesUtils.numberOfShardsOfType(clusterState.getRoutingNodes(), STARTED), equalTo(10)); + // AllocationConstraints will choose node2 for 3 test1 shards and the rest are assigned to node1 + // These shards then relocate to node2 after balanceByWeights() is invoked + assertThat(RoutingNodesUtils.numberOfShardsOfType(clusterState.getRoutingNodes(), STARTED), equalTo(8)); + assertThat(RoutingNodesUtils.numberOfShardsOfType(clusterState.getRoutingNodes(), RELOCATING), equalTo(2)); + // Expected values depend on which index shards were chosen for relocation. + String relocatingIndex = getRelocatingIndex(clusterState); + Matcher node1Matcher = anyOf(equalTo("test"), equalTo("test1")); + Matcher node2Matcher = relocatingIndex.equals("test1") ? equalTo("test1") : node1Matcher; for (ShardRouting shardRouting : clusterState.getRoutingNodes().node("node1")) { - assertThat(shardRouting.getIndexName(), equalTo("test")); + assertThat(shardRouting.getIndexName(), node1Matcher); } for (ShardRouting shardRouting : clusterState.getRoutingNodes().node("node2")) { - assertThat(shardRouting.getIndexName(), equalTo("test1")); + assertThat(shardRouting.getIndexName(), node2Matcher); } logger.info("update {} for test, see that things move", @@ -226,10 +240,12 @@ public class ShardsLimitAllocationTests extends OpenSearchAllocationTestCase { logger.info("reroute after setting"); clusterState = strategy.reroute(clusterState, "reroute"); - assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(STARTED), equalTo(3)); - assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(RELOCATING), equalTo(2)); - assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(RELOCATING), equalTo(2)); - assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(3)); + int node1RelocCount = relocatingIndex.equals("test1") ? 4 : 2; + assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(RELOCATING), equalTo(node1RelocCount)); + + int node2RelocCount = relocatingIndex.equals("test1") ? 2 : 0; + assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(RELOCATING), equalTo(node2RelocCount)); + assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(3 - node2RelocCount)); // the first move will destroy the balance and the balancer will move 2 shards from node2 to node one right after // moving the nodes to node2 since we consider INITIALIZING nodes during rebalance clusterState = startInitializingShardsAndReroute(strategy, clusterState); @@ -237,4 +253,13 @@ public class ShardsLimitAllocationTests extends OpenSearchAllocationTestCase { assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(STARTED), equalTo(5)); assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(5)); } + + private String getRelocatingIndex(ClusterState clusterState) { + final Set indices = Stream.of(clusterState.getRoutingNodes()) + .flatMap(node -> node.shardsWithState(RELOCATING).stream().map(x -> x.getIndexName())) + .collect(Collectors.toSet()); + assertThat(indices.size(), equalTo(1)); + return indices.iterator().next(); + + } } diff --git a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationWithConstraintsTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationWithConstraintsTestCase.java new file mode 100644 index 00000000000..60ac3b6d411 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationWithConstraintsTestCase.java @@ -0,0 +1,336 @@ +/* + * Copyright OpenSearch Contributors. + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.cluster; + +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.CollectionUtils; +import org.junit.Before; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; + +public abstract class OpenSearchAllocationWithConstraintsTestCase extends OpenSearchAllocationTestCase { + + protected MockAllocationService allocation; + private ClusterState initialClusterState; + protected ClusterState clusterState; + private HashMap indexShardCount; + private HashMap> nodeShardsByIndex; + private static final int MAX_REROUTE_STEPS_ALLOWED = 1500; + + @Before + public void clearState() { + allocation = null; + initialClusterState = null; + clusterState = null; + indexShardCount = null; + nodeShardsByIndex = null; + } + + public static String shardIdentifierFromRouting(ShardRouting shardRouting) { + return shardRouting.shardId().toString() + "[" + (shardRouting.primary() ? "p" : "r") + "]"; + } + + public void buildAllocationService() { + Settings.Builder sb = Settings.builder(); + buildAllocationService(sb); + } + + public void buildAllocationService(String excludeNodes) { + logger.info("Excluding nodes: [{}]", excludeNodes); + Settings.Builder sb = Settings.builder().put("cluster.routing.allocation.exclude._node_id", excludeNodes); + + buildAllocationService(sb); + } + + public void buildZoneAwareAllocationService() { + logger.info("Creating zone aware cluster"); + Settings.Builder sb = Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone"); + + buildAllocationService(sb); + } + + public void buildAllocationService(Settings.Builder sb) { + sb.put("cluster.routing.allocation.node_concurrent_recoveries", 1); + sb.put("cluster.routing.allocation.cluster_concurrent_rebalance", 1); + allocation = createAllocationService(sb.build()); + } + + public Metadata buildMetadata(Metadata.Builder mb, String indexPrefix, int numberOfIndices, int numberOfShards, + int numberOfReplicas) { + for (int i = 0; i < numberOfIndices; i++) { + mb.put(IndexMetadata.builder(indexPrefix + i) + .settings(settings(Version.CURRENT) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0")) + .numberOfShards(numberOfShards).numberOfReplicas(numberOfReplicas)); + } + return mb.build(); + } + + public RoutingTable buildRoutingTable(RoutingTable.Builder rb, Metadata metadata, String indexPrefix, + int numberOfIndices) { + for (int i = 0; i < numberOfIndices; i++) { + rb.addAsNew(metadata.index(indexPrefix + i)); + } + return rb.build(); + } + + public DiscoveryNodes addNodes(DiscoveryNodes.Builder nb, String nodePrefix, int nodesAdded) { + for (int i = 0; i < nodesAdded; i++) { + nb.add(newNode(nodePrefix + i, singletonMap("_node_id", nodePrefix + i))); + } + return nb.build(); + } + + public void resetCluster() { + clusterState = initialClusterState; + } + + public void updateInitialCluster() { + initialClusterState = clusterState; + } + + public void createEmptyZoneAwareCluster(Map nodesPerZone) { + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(); + int nodeId = 0; + for (String zone : nodesPerZone.keySet()) { + for (int i = 0; i < nodesPerZone.get(zone); i++) { + nb.add(newNode("node_" + nodeId, singletonMap("zone", zone))); + nodeId++; + } + } + initialClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(nb.build()).build(); + clusterState = initialClusterState; + } + + public void setupInitialCluster(int nodeCount, int indices, int shards, int replicas) { + final String DEFAULT_INDEX_PREFIX = "index_"; + final String DEFAULT_NODE_PREFIX = "node_"; + + Metadata metadata = buildMetadata(Metadata.builder(), DEFAULT_INDEX_PREFIX, indices, shards, replicas); + RoutingTable routingTable = buildRoutingTable(RoutingTable.builder(), metadata, DEFAULT_INDEX_PREFIX, indices); + DiscoveryNodes nodes = addNodes(DiscoveryNodes.builder(), DEFAULT_NODE_PREFIX, nodeCount); + initialClusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(routingTable) + .nodes(nodes).build(); + + buildAllocationService(); + initialClusterState = allocateShardsAndBalance(initialClusterState); + clusterState = initialClusterState; + + indexShardCount = new HashMap<>(); + for (int i = 0; i < indices; i++) { + indexShardCount.put(DEFAULT_INDEX_PREFIX + i, shards * (replicas + 1)); + } + + assertForIndexShardHotSpots(false, nodeCount); // Initial cluster should be balanced + logger.info("Initial cluster created..."); + } + + public void buildNodeShardsByIndex() { + nodeShardsByIndex = new HashMap<>(); + for (RoutingNode rn : clusterState.getRoutingNodes()) { + String node = rn.nodeId(); + nodeShardsByIndex.put(node, new HashMap<>()); + for (ShardRouting shard : rn) { + assert shard.currentNodeId().equals(node); + if (shard.state() == INITIALIZING || shard.state() == STARTED) { + nodeShardsByIndex.get(node).merge(shard.getIndexName(), 1, Integer::sum); + } + } + } + } + + public boolean isIndexHotSpotPresent(int nodes, List nodeList) { + for (String node : nodeList) { + for (String index : nodeShardsByIndex.get(node).keySet()) { + int count = nodeShardsByIndex.get(node).get(index); + int limit = (int) Math.ceil(indexShardCount.get(index) / (float) nodes); + if (count > limit) { + return true; + } + } + } + return false; + } + + public List getNodeList(String... nodesToValidate) { + if (CollectionUtils.isEmpty(nodesToValidate)) { + return Arrays.asList(clusterState.getNodes().resolveNodes()); + } + return Arrays.asList(nodesToValidate); + } + + public void assertForIndexShardHotSpots(boolean expected, int nodes, String... nodesToValidate) { + List nodeList = getNodeList(nodesToValidate); + buildNodeShardsByIndex(); + assertEquals(expected, isIndexHotSpotPresent(nodes, nodeList)); + } + + public int allocateAndCheckIndexShardHotSpots(boolean expected, int nodes, String... nodesToValidate) { + List nodeList = getNodeList(nodesToValidate); + boolean hasHotSpot = false; + buildNodeShardsByIndex(); + List initShards; + int movesToBalance = 0; + do { + assert movesToBalance <= MAX_REROUTE_STEPS_ALLOWED : "Could not balance cluster in max allowed moves"; + clusterState = allocation.applyStartedShards(clusterState, + clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + clusterState = allocation.reroute(clusterState, "reroute"); + + initShards = clusterState.getRoutingNodes().shardsWithState(INITIALIZING); + for (ShardRouting shard : initShards) { + String node = shard.currentNodeId(); + String index = shard.getIndexName(); + nodeShardsByIndex.get(node).merge(index, 1, Integer::sum); + + if (!nodeList.contains(node)) { + continue; + } + + int count = nodeShardsByIndex.get(node).get(index); + int limit = (int) Math.ceil(indexShardCount.get(index) / (float) nodes); + if (count <= limit) { + continue; + } + + /** + * Hot spots can occur due to the order in which shards get allocated to nodes. + * A node with fewer shards may not be able to accept current shard due to + * SameShardAllocationDecider, causing it to breach allocation constraint on + * another node. We need to differentiate between such hot spots v/s actual hot + * spots. + * + * A simple check could be to ensure there is no node with shards less than + * allocation limit, that can accept current shard. However, in current + * allocation algorithm, when nodes get throttled, shards are added to + * ModelNodes without adding them to actual cluster (RoutingNodes). As a result, + * the shards per node we see here, are different from the ones observed by + * weight function in balancer. RoutingNodes with {@link count} < {@link limit} + * may not have had the same count in the corresponding ModelNode seen by weight + * function. We hence use the following alternate check -- + * + * Given the way {@link limit} is defined, we should not have hot spots if *all* + * nodes are eligible to accept the shard. A hot spot is acceptable, if either + * all peer nodes have {@link count} > {@link limit}, or if even one node is + * ineligible to accept the shard due to SameShardAllocationDecider, as this + * leads to a chain of events that breach IndexShardsPerNode constraint on all + * other nodes. + */ + + // If all peer nodes have count >= limit, hotspot is acceptable + boolean limitBreachedOnAllNodes = true; + for (RoutingNode peerNode : clusterState.getRoutingNodes()) { + if (peerNode.nodeId().equals(node)) { + continue; + } + int peerCount = nodeShardsByIndex.get(peerNode.nodeId()).getOrDefault(index, 0); + if (peerCount < limit) { + limitBreachedOnAllNodes = false; + } + } + if (limitBreachedOnAllNodes) { + continue; + } + + // If any node is ineligible to accept the shard, this hot spot is acceptable + boolean peerHasSameShard = false; + for (RoutingNode peerNode : clusterState.getRoutingNodes()) { + if (peerNode.nodeId().equals(node)) { + continue; + } + ShardRouting sameIdShardOnPeer = peerNode.getByShardId(shard.shardId()); + if (sameIdShardOnPeer != null && sameIdShardOnPeer.getIndexName().equals(index)) { + peerHasSameShard = true; + } + } + if (peerHasSameShard) { + continue; + } + + hasHotSpot = true; + } + movesToBalance++; + } while (!initShards.isEmpty()); + + logger.info("HotSpot: [{}], Moves to balance: [{}]", hasHotSpot, movesToBalance); + assertEquals(expected, hasHotSpot); + assertForIndexShardHotSpots(false, nodes); // Post re-balancing, cluster should always be hot spot free. + return movesToBalance; + } + + public ClusterState allocateShardsAndBalance(ClusterState clusterState) { + int iterations = 0; + do { + clusterState = allocation.applyStartedShards(clusterState, + clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + clusterState = allocation.reroute(clusterState, "reroute"); + iterations++; + } while (!clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() + && iterations < MAX_REROUTE_STEPS_ALLOWED); + return clusterState; + } + + public void addNodesWithoutIndexing(int nodeCount, String node_prefix) { + DiscoveryNodes nodes = addNodes(DiscoveryNodes.builder(clusterState.nodes()), node_prefix, nodeCount); + clusterState = ClusterState.builder(clusterState).nodes(nodes).build(); + } + + public void terminateNodes(String... nodesToTerminate) { + if (CollectionUtils.isEmpty(nodesToTerminate)) { + return; + } + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()); + Arrays.asList(nodesToTerminate).forEach(node -> nb.remove(node)); + clusterState = ClusterState.builder(clusterState).nodes(nb.build()).build(); + clusterState = allocation.disassociateDeadNodes(clusterState, true, "node-terminated"); + clusterState = allocateShardsAndBalance(clusterState); + } + + public void addNodesWithIndexing(int nodeCount, String node_prefix, int indices, int shards, int replicas) { + final String NEW_INDEX_PREFIX = "new_index_"; + Metadata md = buildMetadata(Metadata.builder(clusterState.getMetadata()), NEW_INDEX_PREFIX, indices, shards, + replicas); + RoutingTable rb = buildRoutingTable(RoutingTable.builder(clusterState.getRoutingTable()), md, NEW_INDEX_PREFIX, + indices); + DiscoveryNodes nodes = addNodes(DiscoveryNodes.builder(clusterState.nodes()), node_prefix, nodeCount); + clusterState = ClusterState.builder(clusterState).metadata(md).routingTable(rb).nodes(nodes).build(); + for (int i = 0; i < indices; i++) { + indexShardCount.put(NEW_INDEX_PREFIX + i, shards * (replicas + 1)); + } + } + + public void addIndices(String index_prefix, int indices, int shards, int replicas) { + Metadata md = buildMetadata(Metadata.builder(clusterState.getMetadata()), index_prefix, indices, shards, + replicas); + RoutingTable rb = buildRoutingTable(RoutingTable.builder(clusterState.getRoutingTable()), md, index_prefix, + indices); + clusterState = ClusterState.builder(clusterState).metadata(md).routingTable(rb).build(); + + if (indexShardCount == null) { + indexShardCount = new HashMap<>(); + } + + for (int i = 0; i < indices; i++) { + indexShardCount.put(index_prefix + i, shards * (replicas + 1)); + } + } + +}