diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
new file mode 100644
index 00000000000..956a3c95051
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
+
+/**
+ * This class contains various static methods used by the Placement Algorithms
+ * to simplify constrained placement.
+ * (see also {@link DefaultPlacementAlgorithm}).
+ */
+@Public
+@Unstable
+public final class PlacementConstraintsUtil {
+
+ // Suppresses default constructor, ensuring non-instantiability.
+ private PlacementConstraintsUtil() {
+ }
+
+ /**
+ * Returns true if **single** application constraint with associated
+ * allocationTags and scope is satisfied by a specific scheduler Node.
+ *
+ * @param appId the application id
+ * @param sc the placement constraint
+ * @param te the target expression
+ * @param node the scheduler node
+ * @param tm the allocation tags store
+ * @return true if single application constraint is satisfied by node
+ * @throws InvalidAllocationTagsQueryException
+ */
+ private static boolean canSatisfySingleConstraintExpression(
+ ApplicationId appId, SingleConstraint sc, TargetExpression te,
+ SchedulerNode node, AllocationTagsManager tm)
+ throws InvalidAllocationTagsQueryException {
+ long minScopeCardinality = 0;
+ long maxScopeCardinality = 0;
+ if (sc.getScope() == PlacementConstraints.NODE) {
+ minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
+ te.getTargetValues(), Long::max);
+ maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
+ te.getTargetValues(), Long::min);
+ } else if (sc.getScope() == PlacementConstraints.RACK) {
+ minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
+ te.getTargetValues(), Long::max);
+ maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
+ te.getTargetValues(), Long::min);
+ }
+ // Make sure Anti-affinity satisfies hard upper limit
+ maxScopeCardinality = sc.getMaxCardinality() == 0 ? maxScopeCardinality - 1
+ : maxScopeCardinality;
+
+ return (minScopeCardinality >= sc.getMinCardinality()
+ && maxScopeCardinality < sc.getMaxCardinality());
+ }
+
+ /**
+ * Returns true if all application constraints with associated allocationTags
+ * are **currently** satisfied by a specific scheduler Node.
+ * To do so the method retrieves and goes through all application constraint
+ * expressions and checks if the specific allocation is between the allowed
+ * min-max cardinality values under the constraint scope (Node/Rack/etc).
+ *
+ * @param appId the application id
+ * @param allocationTags the allocation tags set
+ * @param node the scheduler node
+ * @param pcm the placement constraints store
+ * @param tagsManager the allocation tags store
+ * @return true if all application constraints are satisfied by node
+ * @throws InvalidAllocationTagsQueryException
+ */
+ public static boolean canSatisfyConstraints(ApplicationId appId,
+ Set allocationTags, SchedulerNode node,
+ PlacementConstraintManager pcm, AllocationTagsManager tagsManager)
+ throws InvalidAllocationTagsQueryException {
+ PlacementConstraint constraint = pcm.getConstraint(appId, allocationTags);
+ if (constraint == null) {
+ return true;
+ }
+ // Transform to SimpleConstraint
+ SingleConstraintTransformer singleTransformer =
+ new SingleConstraintTransformer(constraint);
+ constraint = singleTransformer.transform();
+ AbstractConstraint sConstraintExpr = constraint.getConstraintExpr();
+ SingleConstraint single = (SingleConstraint) sConstraintExpr;
+ // Iterate through TargetExpressions
+ Iterator expIt = single.getTargetExpressions().iterator();
+ while (expIt.hasNext()) {
+ TargetExpression currentExp = expIt.next();
+ // Supporting AllocationTag Expressions for now
+ if (currentExp.getTargetType().equals(TargetType.ALLOCATION_TAG)) {
+ // Check if conditions are met
+ if (!canSatisfySingleConstraintExpression(appId, single, currentExp,
+ node, tagsManager)) {
+ return false;
+ }
+ }
+ }
+ // return true if all targetExpressions are satisfied
+ return true;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
index 395c1560c40..9ed9ab13b91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
@@ -19,19 +19,16 @@
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
@@ -65,58 +62,14 @@ public void init(RMContext rmContext) {
.getNodes(filter);
}
- /**
- * TODO: Method will be moved to PlacementConstraintsUtil class (YARN-7682)
- * @param applicationId
- * @param allocationTags
- * @param nodeId
- * @param tagsManager
- * @return boolean
- * @throws InvalidAllocationTagsQueryException
- */
- public boolean canAssign(ApplicationId applicationId,
- Set allocationTags, NodeId nodeId,
- AllocationTagsManager tagsManager)
- throws InvalidAllocationTagsQueryException {
- PlacementConstraint constraint =
- constraintManager.getConstraint(applicationId, allocationTags);
- if (constraint == null) {
- return true;
- }
- // TODO: proper transformations
- // Currently works only for simple anti-affinity
- // NODE scope target expressions
- PlacementConstraintTransformations.SpecializedConstraintTransformer transformer =
- new PlacementConstraintTransformations.SpecializedConstraintTransformer(
- constraint);
- PlacementConstraint transform = transformer.transform();
- PlacementConstraint.TargetConstraint targetConstraint =
- (PlacementConstraint.TargetConstraint) transform.getConstraintExpr();
- // Assume a single target expression tag;
- // The Sample Algorithm assumes a constraint will always be a simple
- // Target Constraint with a single entry in the target set.
- // As mentioned in the class javadoc - This algorithm should be
- // used mostly for testing and validating end-2-end workflow.
- String targetTag = targetConstraint.getTargetExpressions().iterator().next()
- .getTargetValues().iterator().next();
- // TODO: Assuming anti-affinity constraint
- long nodeCardinality =
- tagsManager.getNodeCardinality(nodeId, applicationId, targetTag);
- if (nodeCardinality != 0) {
- return false;
- }
- // return true if it is a valid placement
- return true;
- }
-
public boolean attemptPlacementOnNode(ApplicationId appId,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode)
throws InvalidAllocationTagsQueryException {
int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
if (numAllocs > 0) {
- if (canAssign(appId,
- schedulingRequest.getAllocationTags(), schedulerNode.getNodeID(),
- tagsManager)) {
+ if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
+ schedulingRequest.getAllocationTags(), schedulerNode,
+ constraintManager, tagsManager)) {
return true;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
new file mode 100644
index 00000000000..7492233165c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Test the PlacementConstraint Utility class functionality.
+ */
+public class TestPlacementConstraintsUtil {
+
+ private List rmNodes;
+ private RMContext rmContext;
+ private static final int GB = 1024;
+ private ApplicationId appId1;
+ private PlacementConstraint c1, c2, c3, c4;
+ private Set sourceTag1, sourceTag2;
+ private Map, PlacementConstraint> constraintMap1, constraintMap2;
+
+ @Before
+ public void setup() {
+ MockRM rm = new MockRM();
+ rm.start();
+ MockNodes.resetHostIds();
+ rmNodes = MockNodes.newNodes(2, 2, Resource.newInstance(4096, 4));
+ for (RMNode rmNode : rmNodes) {
+ rm.getRMContext().getRMNodes().putIfAbsent(rmNode.getNodeID(), rmNode);
+ }
+ rmContext = rm.getRMContext();
+
+ // Build appIDs, constraints, source tags, and constraint map.
+ long ts = System.currentTimeMillis();
+ appId1 = BuilderUtils.newApplicationId(ts, 123);
+
+ c1 = PlacementConstraints.build(targetIn(NODE, allocationTag("hbase-m")));
+ c2 = PlacementConstraints.build(targetIn(RACK, allocationTag("hbase-rs")));
+ c3 = PlacementConstraints
+ .build(targetNotIn(NODE, allocationTag("hbase-m")));
+ c4 = PlacementConstraints
+ .build(targetNotIn(RACK, allocationTag("hbase-rs")));
+
+ sourceTag1 = new HashSet<>(Arrays.asList("spark"));
+ sourceTag2 = new HashSet<>(Arrays.asList("zk"));
+
+ constraintMap1 = Stream
+ .of(new AbstractMap.SimpleEntry<>(sourceTag1, c1),
+ new AbstractMap.SimpleEntry<>(sourceTag2, c2))
+ .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
+ AbstractMap.SimpleEntry::getValue));
+ constraintMap2 = Stream
+ .of(new AbstractMap.SimpleEntry<>(sourceTag1, c3),
+ new AbstractMap.SimpleEntry<>(sourceTag2, c4))
+ .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
+ AbstractMap.SimpleEntry::getValue));
+ }
+
+ @Test
+ public void testNodeAffinityAssignment()
+ throws InvalidAllocationTagsQueryException {
+ PlacementConstraintManagerService pcm =
+ new MemoryPlacementConstraintManager();
+ AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+ // Register App1 with affinity constraint map
+ pcm.registerApplication(appId1, constraintMap1);
+ // No containers are running so all 'zk' and 'spark' allocations should fail
+ // on every cluster NODE
+ Iterator nodeIterator = rmNodes.iterator();
+ while (nodeIterator.hasNext()) {
+ RMNode currentNode = nodeIterator.next();
+ FiCaSchedulerNode schedulerNode = TestUtils.getMockNode(
+ currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB);
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag1, schedulerNode, pcm, tm));
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag2, schedulerNode, pcm, tm));
+ }
+ /**
+ * Now place container:
+ * Node0:123 (Rack1):
+ * container_app1_1 (hbase-m)
+ */
+ RMNode n0_r1 = rmNodes.get(0);
+ RMNode n1_r1 = rmNodes.get(1);
+ RMNode n2_r2 = rmNodes.get(2);
+ RMNode n3_r2 = rmNodes.get(3);
+ FiCaSchedulerNode schedulerNode0 = TestUtils
+ .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
+ FiCaSchedulerNode schedulerNode1 = TestUtils
+ .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
+ FiCaSchedulerNode schedulerNode2 = TestUtils
+ .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
+ FiCaSchedulerNode schedulerNode3 = TestUtils
+ .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+ // 1 Containers on node 0 with allocationTag 'hbase-m'
+ ContainerId hbase_m = ContainerId
+ .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
+ tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
+
+ // 'spark' placement on Node0 should now SUCCEED
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag1, schedulerNode0, pcm, tm));
+ // FAIL on the rest of the nodes
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag1, schedulerNode1, pcm, tm));
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag1, schedulerNode2, pcm, tm));
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag1, schedulerNode3, pcm, tm));
+ }
+
+ @Test
+ public void testRackAffinityAssignment()
+ throws InvalidAllocationTagsQueryException {
+ PlacementConstraintManagerService pcm =
+ new MemoryPlacementConstraintManager();
+ AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+ // Register App1 with affinity constraint map
+ pcm.registerApplication(appId1, constraintMap1);
+ /**
+ * Now place container:
+ * Node0:123 (Rack1):
+ * container_app1_1 (hbase-rs)
+ */
+ RMNode n0_r1 = rmNodes.get(0);
+ RMNode n1_r1 = rmNodes.get(1);
+ RMNode n2_r2 = rmNodes.get(2);
+ RMNode n3_r2 = rmNodes.get(3);
+ // 1 Containers on Node0-Rack1 with allocationTag 'hbase-rs'
+ ContainerId hbase_m = ContainerId
+ .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
+ tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
+
+ FiCaSchedulerNode schedulerNode0 = TestUtils
+ .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
+ FiCaSchedulerNode schedulerNode1 = TestUtils
+ .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
+ FiCaSchedulerNode schedulerNode2 = TestUtils
+ .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
+ FiCaSchedulerNode schedulerNode3 = TestUtils
+ .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+ // 'zk' placement on Rack1 should now SUCCEED
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag2, schedulerNode0, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag2, schedulerNode1, pcm, tm));
+
+ // FAIL on the rest of the RACKs
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag2, schedulerNode2, pcm, tm));
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag2, schedulerNode3, pcm, tm));
+ }
+
+ @Test
+ public void testNodeAntiAffinityAssignment()
+ throws InvalidAllocationTagsQueryException {
+ PlacementConstraintManagerService pcm =
+ new MemoryPlacementConstraintManager();
+ AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+ // Register App1 with anti-affinity constraint map
+ pcm.registerApplication(appId1, constraintMap2);
+ /**
+ * place container:
+ * Node0:123 (Rack1):
+ * container_app1_1 (hbase-m)
+ */
+ RMNode n0_r1 = rmNodes.get(0);
+ RMNode n1_r1 = rmNodes.get(1);
+ RMNode n2_r2 = rmNodes.get(2);
+ RMNode n3_r2 = rmNodes.get(3);
+ FiCaSchedulerNode schedulerNode0 = TestUtils
+ .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
+ FiCaSchedulerNode schedulerNode1 = TestUtils
+ .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
+ FiCaSchedulerNode schedulerNode2 = TestUtils
+ .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
+ FiCaSchedulerNode schedulerNode3 = TestUtils
+ .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+ // 1 Containers on node 0 with allocationTag 'hbase-m'
+ ContainerId hbase_m = ContainerId
+ .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
+ tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
+
+ // 'spark' placement on Node0 should now FAIL
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag1, schedulerNode0, pcm, tm));
+ // SUCCEED on the rest of the nodes
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag1, schedulerNode1, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag1, schedulerNode2, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag1, schedulerNode3, pcm, tm));
+ }
+
+ @Test
+ public void testRackAntiAffinityAssignment()
+ throws InvalidAllocationTagsQueryException {
+ AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+ PlacementConstraintManagerService pcm =
+ new MemoryPlacementConstraintManager();
+ // Register App1 with anti-affinity constraint map
+ pcm.registerApplication(appId1, constraintMap2);
+ /**
+ * Place container:
+ * Node0:123 (Rack1):
+ * container_app1_1 (hbase-rs)
+ */
+ RMNode n0_r1 = rmNodes.get(0);
+ RMNode n1_r1 = rmNodes.get(1);
+ RMNode n2_r2 = rmNodes.get(2);
+ RMNode n3_r2 = rmNodes.get(3);
+ // 1 Containers on Node0-Rack1 with allocationTag 'hbase-rs'
+ ContainerId hbase_m = ContainerId
+ .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
+ tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
+
+ FiCaSchedulerNode schedulerNode0 = TestUtils
+ .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
+ FiCaSchedulerNode schedulerNode1 = TestUtils
+ .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
+ FiCaSchedulerNode schedulerNode2 = TestUtils
+ .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
+ FiCaSchedulerNode schedulerNode3 = TestUtils
+ .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+
+ // 'zk' placement on Rack1 should FAIL
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag2, schedulerNode0, pcm, tm));
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag2, schedulerNode1, pcm, tm));
+
+ // SUCCEED on the rest of the RACKs
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag2, schedulerNode2, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ sourceTag2, schedulerNode3, pcm, tm));
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
index 87dd5b71117..c260fe06ae2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -48,16 +49,21 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.Thread.sleep;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
/**
* This tests end2end workflow of the constraint placement framework.
@@ -104,7 +110,7 @@ public void stopRM() {
}
@Test(timeout = 300000)
- public void testPlacement() throws Exception {
+ public void testAntiAffinityPlacement() throws Exception {
HashMap nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
@@ -120,43 +126,173 @@ public void testPlacement() throws Exception {
nm4.registerNode();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ // Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
- Collections.singletonMap(
- Collections.singleton("foo"),
+ Collections.singletonMap(Collections.singleton("foo"),
PlacementConstraints.build(
- PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
- ));
+ PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))));
am1.addSchedulingRequest(
- Arrays.asList(
- schedulingRequest(1, 1, 1, 512, "foo"),
+ Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
- schedulingRequest(1, 5, 1, 512, "foo"))
- );
+ schedulingRequest(1, 5, 1, 512, "foo")));
AllocateResponse allocResponse = am1.schedule(); // send the request
List allocatedContainers = new ArrayList<>();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
-
- while (allocatedContainers.size() < 4) {
- nm1.nodeHeartbeat(true);
- nm2.nodeHeartbeat(true);
- nm3.nodeHeartbeat(true);
- nm4.nodeHeartbeat(true);
- LOG.info("Waiting for containers to be created for app 1...");
- sleep(1000);
- allocResponse = am1.schedule();
- allocatedContainers.addAll(allocResponse.getAllocatedContainers());
- }
+ waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 4);
Assert.assertEquals(4, allocatedContainers.size());
- Set nodeIds = allocatedContainers.stream()
- .map(x -> x.getNodeId()).collect(Collectors.toSet());
- // Ensure unique nodes
+ Set nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
+ .collect(Collectors.toSet());
+ // Ensure unique nodes (antiaffinity)
Assert.assertEquals(4, nodeIds.size());
}
+ @Test(timeout = 300000)
+ public void testCardinalityPlacement() throws Exception {
+ HashMap nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm3.getNodeId(), nm3);
+ MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm4.getNodeId(), nm4);
+ nm1.registerNode();
+ nm2.registerNode();
+ nm3.registerNode();
+ nm4.registerNode();
+
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ // Containers with allocationTag 'foo' should not exceed 4 per NODE
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+ Collections.singletonMap(Collections.singleton("foo"),
+ PlacementConstraints.build(PlacementConstraints
+ .targetCardinality(NODE, 0, 4, allocationTag("foo")))));
+ am1.addSchedulingRequest(
+ Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"),
+ schedulingRequest(1, 2, 1, 512, "foo"),
+ schedulingRequest(1, 3, 1, 512, "foo"),
+ schedulingRequest(1, 4, 1, 512, "foo"),
+ schedulingRequest(1, 5, 1, 512, "foo"),
+ schedulingRequest(1, 6, 1, 512, "foo"),
+ schedulingRequest(1, 7, 1, 512, "foo"),
+ schedulingRequest(1, 8, 1, 512, "foo")));
+ AllocateResponse allocResponse = am1.schedule(); // send the request
+ List allocatedContainers = new ArrayList<>();
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+
+ // kick the scheduler
+ waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 8);
+
+ Assert.assertEquals(8, allocatedContainers.size());
+ Map nodeIdContainerIdMap =
+ allocatedContainers.stream().collect(
+ Collectors.groupingBy(c -> c.getNodeId(), Collectors.counting()));
+ // Ensure no more than 4 containers per node
+ for (NodeId n : nodeIdContainerIdMap.keySet()) {
+ Assert.assertTrue(nodeIdContainerIdMap.get(n) < 5);
+ }
+ }
+
+ @Test(timeout = 300000)
+ public void testAffinityPlacement() throws Exception {
+ HashMap nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm3.getNodeId(), nm3);
+ MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm4.getNodeId(), nm4);
+ nm1.registerNode();
+ nm2.registerNode();
+ nm3.registerNode();
+ nm4.registerNode();
+
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ // Containers with allocationTag 'foo' should be placed where
+ // containers with allocationTag 'bar' are already running
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+ Collections.singletonMap(Collections.singleton("foo"),
+ PlacementConstraints.build(
+ PlacementConstraints.targetIn(NODE, allocationTag("bar")))));
+ am1.addSchedulingRequest(
+ Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"),
+ schedulingRequest(1, 2, 1, 512, "foo"),
+ schedulingRequest(1, 3, 1, 512, "foo"),
+ schedulingRequest(1, 4, 1, 512, "foo"),
+ schedulingRequest(1, 5, 1, 512, "foo")));
+ AllocateResponse allocResponse = am1.schedule(); // send the request
+ List allocatedContainers = new ArrayList<>();
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+
+ // kick the scheduler
+ waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5);
+
+ Assert.assertEquals(5, allocatedContainers.size());
+ Set nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
+ .collect(Collectors.toSet());
+ // Ensure all containers end up on the same node (affinity)
+ Assert.assertEquals(1, nodeIds.size());
+ }
+
+ @Test(timeout = 300000)
+ public void testComplexPlacement() throws Exception {
+ HashMap nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm3.getNodeId(), nm3);
+ MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm4.getNodeId(), nm4);
+ nm1.registerNode();
+ nm2.registerNode();
+ nm3.registerNode();
+ nm4.registerNode();
+
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ Map, PlacementConstraint> constraintMap = new HashMap<>();
+ // Containers with allocationTag 'bar' should not exceed 1 per NODE
+ constraintMap.put(Collections.singleton("bar"),
+ PlacementConstraints.build(targetNotIn(NODE, allocationTag("bar"))));
+ // Containers with allocationTag 'foo' should be placed where 'bar' exists
+ constraintMap.put(Collections.singleton("foo"),
+ PlacementConstraints.build(targetIn(NODE, allocationTag("bar"))));
+ // Containers with allocationTag 'foo' should not exceed 2 per NODE
+ constraintMap.put(Collections.singleton("foo"), PlacementConstraints
+ .build(targetCardinality(NODE, 0, 2, allocationTag("foo"))));
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, constraintMap);
+ am1.addSchedulingRequest(
+ Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"),
+ schedulingRequest(1, 2, 1, 512, "bar"),
+ schedulingRequest(1, 3, 1, 512, "foo"),
+ schedulingRequest(1, 4, 1, 512, "foo"),
+ schedulingRequest(1, 5, 1, 512, "foo"),
+ schedulingRequest(1, 6, 1, 512, "foo")));
+ AllocateResponse allocResponse = am1.schedule(); // send the request
+ List allocatedContainers = new ArrayList<>();
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+
+ // kick the scheduler
+ waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 6);
+
+ Assert.assertEquals(6, allocatedContainers.size());
+ Map nodeIdContainerIdMap =
+ allocatedContainers.stream().collect(
+ Collectors.groupingBy(c -> c.getNodeId(), Collectors.counting()));
+ // Ensure no more than 3 containers per node (1 'bar', 2 'foo')
+ for (NodeId n : nodeIdContainerIdMap.keySet()) {
+ Assert.assertTrue(nodeIdContainerIdMap.get(n) < 4);
+ }
+ }
+
@Test(timeout = 300000)
public void testSchedulerRejection() throws Exception {
HashMap nodes = new HashMap<>();
@@ -174,6 +310,7 @@ public void testSchedulerRejection() throws Exception {
nm4.registerNode();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ // Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(
Collections.singleton("foo"),
@@ -196,7 +333,6 @@ public void testSchedulerRejection() throws Exception {
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
// kick the scheduler
-
while (allocCount < 11) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
@@ -253,9 +389,10 @@ public void testRePlacementAfterSchedulerRejection() throws Exception {
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
- // No not register nm5 yet..
+ // Do not register nm5 yet..
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ // Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(
Collections.singleton("foo"),
@@ -323,6 +460,7 @@ public void testPlacementRejection() throws Exception {
nm4.registerNode();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ // Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(
Collections.singleton("foo"),
@@ -346,7 +484,6 @@ public void testPlacementRejection() throws Exception {
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
// kick the scheduler
-
while (allocCount < 11) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
@@ -373,6 +510,21 @@ public void testPlacementRejection() throws Exception {
rej.getReason());
}
+ private static void waitForContainerAllocation(Collection nodes,
+ MockAM am, List allocatedContainers, int containerNum)
+ throws Exception {
+ while (allocatedContainers.size() < containerNum) {
+ for (MockNM node : nodes) {
+ node.nodeHeartbeat(true);
+ }
+ LOG.info("Waiting for containers to be created for "
+ + am.getApplicationAttemptId().getApplicationId() + "...");
+ sleep(1000);
+ AllocateResponse allocResponse = am.schedule();
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+ }
+ }
+
protected static SchedulingRequest schedulingRequest(
int priority, long allocReqId, int cores, int mem, String... tags) {
return schedulingRequest(priority, allocReqId, cores, mem,