Fix for bug showing incorrect awareness attributes count in AwarenessAllocationDecider (#3428)

* Fix for bug showing incorrect awareness attributes count in AwarenessAllocationDecider

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
This commit is contained in:
Anshu Agarwal 2022-06-14 11:29:52 +05:30 committed by GitHub
parent cce0781b77
commit 2f9f8e1e3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 101 additions and 4 deletions

View File

@ -33,11 +33,14 @@
package org.opensearch.cluster.routing.allocation.decider; package org.opensearch.cluster.routing.allocation.decider;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import com.carrotsearch.hppc.ObjectIntHashMap; import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRouting;
@ -207,12 +210,14 @@ public class AwarenessAllocationDecider extends AllocationDecider {
int numberOfAttributes = nodesPerAttribute.size(); int numberOfAttributes = nodesPerAttribute.size();
List<String> fullValues = forcedAwarenessAttributes.get(awarenessAttribute); List<String> fullValues = forcedAwarenessAttributes.get(awarenessAttribute);
if (fullValues != null) { if (fullValues != null) {
for (String fullValue : fullValues) { // If forced awareness is enabled, numberOfAttributes = count(distinct((union(discovered_attributes, forced_attributes)))
if (shardPerAttribute.containsKey(fullValue) == false) { Set<String> attributesSet = new HashSet<>(fullValues);
numberOfAttributes++; for (ObjectCursor<String> stringObjectCursor : nodesPerAttribute.keys()) {
} attributesSet.add(stringObjectCursor.value);
} }
numberOfAttributes = attributesSet.size();
} }
// TODO should we remove ones that are not part of full list? // TODO should we remove ones that are not part of full list?

View File

@ -35,23 +35,32 @@ package org.opensearch.cluster.routing.allocation;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.opensearch.Version; import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.AllocationCommands; import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
@ -971,4 +980,87 @@ public class AwarenessAllocationTests extends OpenSearchAllocationTestCase {
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
} }
public void testAllocationExplainForUnassignedShardsWithUnbalancedZones() {
Settings settings = Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();
AllocationService strategy = createAllocationService(settings);
logger.info("Building initial routing table for 'testAllocationExplainForUnassignedShardsWithUnbalancedZones'");
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
.build();
RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(initialRoutingTable)
.build();
logger.info("--> adding 3 nodes in different zones and do rerouting");
clusterState = ClusterState.builder(clusterState)
.nodes(
DiscoveryNodes.builder()
.add(newNode("A-0", singletonMap("zone", "a")))
.add(newNode("A-1", singletonMap("zone", "a")))
.add(newNode("B-0", singletonMap("zone", "b")))
)
.build();
clusterState = strategy.reroute(clusterState, "reroute");
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> start the shard (primary)");
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
// One Shard is unassigned due to forced zone awareness
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));
List<ShardRouting> unassignedShards = clusterState.getRoutingTable().shardsWithState(UNASSIGNED);
ClusterSettings EMPTY_CLUSTER_SETTINGS = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
// Add a new node in zone c
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("C-0", singletonMap("zone", "c"))))
.build();
final AwarenessAllocationDecider decider = new AwarenessAllocationDecider(settings, EMPTY_CLUSTER_SETTINGS);
final RoutingAllocation allocation = new RoutingAllocation(
new AllocationDeciders(Collections.singleton(decider)),
clusterState.getRoutingNodes(),
clusterState,
null,
null,
0L
);
allocation.debugDecision(true);
Decision decision = null;
RoutingNodes nodes = clusterState.getRoutingNodes();
for (RoutingNode node : nodes) {
// Try to allocate unassigned shard to A-0, fails because of forced zone awareness
if (node.nodeId().equals("A-0")) {
decision = decider.canAllocate(unassignedShards.get(0), node, allocation);
assertEquals(Decision.Type.NO, decision.type());
assertEquals(
decision.getExplanation(),
"there are too many copies of the shard allocated to nodes with attribute"
+ " [zone], there are [3] total configured shard copies for this shard id and [3]"
+ " total attribute values, expected the allocated shard count per attribute [2] to"
+ " be less than or equal to the upper bound of the required number of shards per attribute [1]"
);
}
}
}
} }