diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 429f9f31f15..a096e2f1e28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2621,9 +2621,9 @@ public class CapacityScheduler extends // Validate placement constraint is satisfied before // committing the request. try { - if (!PlacementConstraintsUtil.canSatisfySingleConstraint( + if (!PlacementConstraintsUtil.canSatisfyConstraints( appAttempt.getApplicationId(), - schedulingRequest.getAllocationTags(), schedulerNode, + schedulingRequest, schedulerNode, rmContext.getPlacementConstraintManager(), rmContext.getAllocationTagsManager())) { LOG.debug("Failed to allocate container for application " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index ff5cb67fe05..c07c16f7cb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint; @@ -54,7 +55,7 @@ public final class PlacementConstraintsUtil { } /** - * Returns true if **single** placement constraint with associated + * Returns true if single placement constraint with associated * allocationTags and scope is satisfied by a specific scheduler Node. * * @param targetApplicationId the application id, which could be override by @@ -148,59 +149,70 @@ public final class PlacementConstraintsUtil { return true; } - /** - * Returns true if all placement constraints are **currently** satisfied by a - * specific scheduler Node.. - * - * To do so the method retrieves and goes through all application constraint - * expressions and checks if the specific allocation is between the allowed - * min-max cardinality values under the constraint scope (Node/Rack/etc). - * - * @param applicationId applicationId, - * @param placementConstraint placement constraint. - * @param node the scheduler node - * @param tagsManager the allocation tags store - * @return true if all application constraints are satisfied by node - * @throws InvalidAllocationTagsQueryException - */ - public static boolean canSatisfySingleConstraint(ApplicationId applicationId, - PlacementConstraint placementConstraint, SchedulerNode node, - AllocationTagsManager tagsManager) + private static boolean canSatisfyConstraints(ApplicationId appId, + PlacementConstraint constraint, SchedulerNode node, + AllocationTagsManager atm) throws InvalidAllocationTagsQueryException { - if (placementConstraint == null) { + if (constraint == null) { return true; } - // Transform to SimpleConstraint - SingleConstraintTransformer singleTransformer = - new SingleConstraintTransformer(placementConstraint); - placementConstraint = singleTransformer.transform(); - AbstractConstraint sConstraintExpr = placementConstraint.getConstraintExpr(); - SingleConstraint single = (SingleConstraint) sConstraintExpr; - return canSatisfySingleConstraint(applicationId, single, node, tagsManager); + // If this is a single constraint, transform to SingleConstraint + SingleConstraintTransformer singleTransformer = + new SingleConstraintTransformer(constraint); + constraint = singleTransformer.transform(); + AbstractConstraint sConstraintExpr = constraint.getConstraintExpr(); + + // TODO handle other type of constraints, e.g CompositeConstraint + if (sConstraintExpr instanceof SingleConstraint) { + SingleConstraint single = (SingleConstraint) sConstraintExpr; + return canSatisfySingleConstraint(appId, single, node, atm); + } else { + throw new InvalidAllocationTagsQueryException( + "Unsupported type of constraint."); + } } /** - * Returns true if all placement constraints with associated allocationTags - * are **currently** satisfied by a specific scheduler Node. - * To do so the method retrieves and goes through all application constraint - * expressions and checks if the specific allocation is between the allowed - * min-max cardinality values under the constraint scope (Node/Rack/etc). + * Returns true if the placement constraint for a given scheduling request + * is currently satisfied by the specific scheduler node. This method + * first validates the constraint specified in the request; if not specified, + * then it validates application level constraint if exists; otherwise, it + * validates the global constraint if exists. + *

+ * This method only checks whether a scheduling request can be placed + * on a node with respect to the certain placement constraint. It gives no + * guarantee that asked allocations can be eventually allocated because + * it doesn't check resource, that needs to be further decided by a scheduler. * - * @param appId the application id - * @param allocationTags the allocation tags set - * @param node the scheduler node - * @param pcm the placement constraints store - * @param tagsManager the allocation tags store - * @return true if all application constraints are satisfied by node + * @param applicationId application id + * @param request scheduling request + * @param schedulerNode node + * @param pcm placement constraint manager + * @param atm allocation tags manager + * @return true if the given node satisfies the constraint of the request * @throws InvalidAllocationTagsQueryException */ - public static boolean canSatisfySingleConstraint(ApplicationId appId, - Set allocationTags, SchedulerNode node, - PlacementConstraintManager pcm, AllocationTagsManager tagsManager) + public static boolean canSatisfyConstraints(ApplicationId applicationId, + SchedulingRequest request, SchedulerNode schedulerNode, + PlacementConstraintManager pcm, AllocationTagsManager atm) throws InvalidAllocationTagsQueryException { - PlacementConstraint constraint = pcm.getConstraint(appId, allocationTags); - return canSatisfySingleConstraint(appId, constraint, node, tagsManager); - } + // TODO do proper merge on different level of constraints, see YARN-7778. + // Request level constraint + PlacementConstraint constraint = request.getPlacementConstraint(); + if (constraint == null) { + // Application level constraint + constraint = pcm.getConstraint(applicationId, + request.getAllocationTags()); + if (constraint == null) { + // Global level constraint + constraint = pcm.getGlobalConstraint(request.getAllocationTags()); + if (constraint == null) { + return true; + } + } + } + return canSatisfyConstraints(applicationId, constraint, schedulerNode, atm); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java index a0749f59fc4..cf2ed15521f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java @@ -71,8 +71,8 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { throws InvalidAllocationTagsQueryException { int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); if (numAllocs > 0) { - if (PlacementConstraintsUtil.canSatisfySingleConstraint(appId, - schedulingRequest.getAllocationTags(), schedulerNode, + if (PlacementConstraintsUtil.canSatisfyConstraints(appId, + schedulingRequest, schedulerNode, constraintManager, tagsManager)) { return true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index dd30b61c460..9e7d71cfaae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerR import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -72,6 +73,7 @@ public class SingleConstraintAppPlacementAllocator private String targetNodePartition; private Set targetAllocationTags; private AllocationTagsManager allocationTagsManager; + private PlacementConstraintManager placementConstraintManager; public SingleConstraintAppPlacementAllocator() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -437,10 +439,9 @@ public class SingleConstraintAppPlacementAllocator // node type will be ignored. try { - return PlacementConstraintsUtil.canSatisfySingleConstraint( - appSchedulingInfo.getApplicationId(), - this.schedulingRequest.getPlacementConstraint(), node, - allocationTagsManager); + return PlacementConstraintsUtil.canSatisfyConstraints( + appSchedulingInfo.getApplicationId(), schedulingRequest, node, + placementConstraintManager, allocationTagsManager); } catch (InvalidAllocationTagsQueryException e) { LOG.warn("Failed to query node cardinality:", e); return false; @@ -527,5 +528,6 @@ public class SingleConstraintAppPlacementAllocator SchedulerRequestKey schedulerRequestKey, RMContext rmContext) { super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext); this.allocationTagsManager = rmContext.getAllocationTagsManager(); + this.placementConstraintManager = rmContext.getPlacementConstraintManager(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java index 8ad726e4dcb..a5460c2d7cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -39,6 +40,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -64,6 +69,7 @@ public class TestPlacementConstraintsUtil { private PlacementConstraint c1, c2, c3, c4; private Set sourceTag1, sourceTag2; private Map, PlacementConstraint> constraintMap1, constraintMap2; + private AtomicLong requestID = new AtomicLong(0); @Before public void setup() { @@ -102,6 +108,22 @@ public class TestPlacementConstraintsUtil { AbstractMap.SimpleEntry::getValue)); } + private SchedulingRequest createSchedulingRequest(Set allocationTags, + PlacementConstraint constraint) { + return SchedulingRequest + .newInstance(requestID.incrementAndGet(), + Priority.newInstance(0), + ExecutionTypeRequest.newInstance(), + allocationTags, + ResourceSizing.newInstance(Resource.newInstance(1024, 3)), + constraint); + } + + private SchedulingRequest createSchedulingRequest(Set + allocationTags) { + return createSchedulingRequest(allocationTags, null); + } + @Test public void testNodeAffinityAssignment() throws InvalidAllocationTagsQueryException { @@ -117,10 +139,10 @@ public class TestPlacementConstraintsUtil { RMNode currentNode = nodeIterator.next(); FiCaSchedulerNode schedulerNode = TestUtils.getMockNode( currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB); - Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag1, schedulerNode, pcm, tm)); - Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag2, schedulerNode, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag2), schedulerNode, pcm, tm)); } /** * Now place container: @@ -145,15 +167,15 @@ public class TestPlacementConstraintsUtil { tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m")); // 'spark' placement on Node0 should now SUCCEED - Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag1, schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm)); // FAIL on the rest of the nodes - Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag1, schedulerNode1, pcm, tm)); - Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag1, schedulerNode2, pcm, tm)); - Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag1, schedulerNode3, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); } @Test @@ -187,16 +209,16 @@ public class TestPlacementConstraintsUtil { FiCaSchedulerNode schedulerNode3 = TestUtils .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); // 'zk' placement on Rack1 should now SUCCEED - Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag2, schedulerNode0, pcm, tm)); - Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag2, schedulerNode1, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag2), schedulerNode1, pcm, tm)); // FAIL on the rest of the RACKs - Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag2, schedulerNode2, pcm, tm)); - Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag2, schedulerNode3, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag2), schedulerNode2, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm)); } @Test @@ -230,15 +252,15 @@ public class TestPlacementConstraintsUtil { tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m")); // 'spark' placement on Node0 should now FAIL - Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag1, schedulerNode0, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm)); // SUCCEED on the rest of the nodes - Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag1, schedulerNode1, pcm, tm)); - Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag1, schedulerNode2, pcm, tm)); - Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag1, schedulerNode3, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); } @Test @@ -273,15 +295,15 @@ public class TestPlacementConstraintsUtil { .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); // 'zk' placement on Rack1 should FAIL - Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag2, schedulerNode0, pcm, tm)); - Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag2, schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag2), schedulerNode1, pcm, tm)); // SUCCEED on the rest of the RACKs - Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag2, schedulerNode2, pcm, tm)); - Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, - sourceTag2, schedulerNode3, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag2), schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm)); } } \ No newline at end of file