YARN-7763. Allow Constraints specified in the SchedulingRequest to override application level constraints. (Weiwei Yang via asuresh)

This commit is contained in:
Arun Suresh 2018-01-21 19:11:17 -08:00
parent 28fe7f3318
commit 8bf7c44436
5 changed files with 124 additions and 88 deletions

View File

@ -2621,9 +2621,9 @@ public class CapacityScheduler extends
// Validate placement constraint is satisfied before // Validate placement constraint is satisfied before
// committing the request. // committing the request.
try { try {
if (!PlacementConstraintsUtil.canSatisfySingleConstraint( if (!PlacementConstraintsUtil.canSatisfyConstraints(
appAttempt.getApplicationId(), appAttempt.getApplicationId(),
schedulingRequest.getAllocationTags(), schedulerNode, schedulingRequest, schedulerNode,
rmContext.getPlacementConstraintManager(), rmContext.getPlacementConstraintManager(),
rmContext.getAllocationTagsManager())) { rmContext.getAllocationTagsManager())) {
LOG.debug("Failed to allocate container for application " LOG.debug("Failed to allocate container for application "

View File

@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint;
@ -54,7 +55,7 @@ public final class PlacementConstraintsUtil {
} }
/** /**
* Returns true if **single** placement constraint with associated * Returns true if <b>single</b> placement constraint with associated
* allocationTags and scope is satisfied by a specific scheduler Node. * allocationTags and scope is satisfied by a specific scheduler Node.
* *
* @param targetApplicationId the application id, which could be override by * @param targetApplicationId the application id, which could be override by
@ -148,59 +149,70 @@ public final class PlacementConstraintsUtil {
return true; return true;
} }
/** private static boolean canSatisfyConstraints(ApplicationId appId,
* Returns true if all placement constraints are **currently** satisfied by a PlacementConstraint constraint, SchedulerNode node,
* specific scheduler Node.. AllocationTagsManager atm)
*
* To do so the method retrieves and goes through all application constraint
* expressions and checks if the specific allocation is between the allowed
* min-max cardinality values under the constraint scope (Node/Rack/etc).
*
* @param applicationId applicationId,
* @param placementConstraint placement constraint.
* @param node the scheduler node
* @param tagsManager the allocation tags store
* @return true if all application constraints are satisfied by node
* @throws InvalidAllocationTagsQueryException
*/
public static boolean canSatisfySingleConstraint(ApplicationId applicationId,
PlacementConstraint placementConstraint, SchedulerNode node,
AllocationTagsManager tagsManager)
throws InvalidAllocationTagsQueryException { throws InvalidAllocationTagsQueryException {
if (placementConstraint == null) { if (constraint == null) {
return true; return true;
} }
// Transform to SimpleConstraint
SingleConstraintTransformer singleTransformer =
new SingleConstraintTransformer(placementConstraint);
placementConstraint = singleTransformer.transform();
AbstractConstraint sConstraintExpr = placementConstraint.getConstraintExpr();
SingleConstraint single = (SingleConstraint) sConstraintExpr;
return canSatisfySingleConstraint(applicationId, single, node, tagsManager); // If this is a single constraint, transform to SingleConstraint
SingleConstraintTransformer singleTransformer =
new SingleConstraintTransformer(constraint);
constraint = singleTransformer.transform();
AbstractConstraint sConstraintExpr = constraint.getConstraintExpr();
// TODO handle other type of constraints, e.g CompositeConstraint
if (sConstraintExpr instanceof SingleConstraint) {
SingleConstraint single = (SingleConstraint) sConstraintExpr;
return canSatisfySingleConstraint(appId, single, node, atm);
} else {
throw new InvalidAllocationTagsQueryException(
"Unsupported type of constraint.");
}
} }
/** /**
* Returns true if all placement constraints with associated allocationTags * Returns true if the placement constraint for a given scheduling request
* are **currently** satisfied by a specific scheduler Node. * is <b>currently</b> satisfied by the specific scheduler node. This method
* To do so the method retrieves and goes through all application constraint * first validates the constraint specified in the request; if not specified,
* expressions and checks if the specific allocation is between the allowed * then it validates application level constraint if exists; otherwise, it
* min-max cardinality values under the constraint scope (Node/Rack/etc). * validates the global constraint if exists.
* <p/>
* This method only checks whether a scheduling request can be placed
* on a node with respect to the certain placement constraint. It gives no
* guarantee that asked allocations can be eventually allocated because
* it doesn't check resource, that needs to be further decided by a scheduler.
* *
* @param appId the application id * @param applicationId application id
* @param allocationTags the allocation tags set * @param request scheduling request
* @param node the scheduler node * @param schedulerNode node
* @param pcm the placement constraints store * @param pcm placement constraint manager
* @param tagsManager the allocation tags store * @param atm allocation tags manager
* @return true if all application constraints are satisfied by node * @return true if the given node satisfies the constraint of the request
* @throws InvalidAllocationTagsQueryException * @throws InvalidAllocationTagsQueryException
*/ */
public static boolean canSatisfySingleConstraint(ApplicationId appId, public static boolean canSatisfyConstraints(ApplicationId applicationId,
Set<String> allocationTags, SchedulerNode node, SchedulingRequest request, SchedulerNode schedulerNode,
PlacementConstraintManager pcm, AllocationTagsManager tagsManager) PlacementConstraintManager pcm, AllocationTagsManager atm)
throws InvalidAllocationTagsQueryException { throws InvalidAllocationTagsQueryException {
PlacementConstraint constraint = pcm.getConstraint(appId, allocationTags); // TODO do proper merge on different level of constraints, see YARN-7778.
return canSatisfySingleConstraint(appId, constraint, node, tagsManager);
}
// Request level constraint
PlacementConstraint constraint = request.getPlacementConstraint();
if (constraint == null) {
// Application level constraint
constraint = pcm.getConstraint(applicationId,
request.getAllocationTags());
if (constraint == null) {
// Global level constraint
constraint = pcm.getGlobalConstraint(request.getAllocationTags());
if (constraint == null) {
return true;
}
}
}
return canSatisfyConstraints(applicationId, constraint, schedulerNode, atm);
}
} }

View File

@ -71,8 +71,8 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
throws InvalidAllocationTagsQueryException { throws InvalidAllocationTagsQueryException {
int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
if (numAllocs > 0) { if (numAllocs > 0) {
if (PlacementConstraintsUtil.canSatisfySingleConstraint(appId, if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
schedulingRequest.getAllocationTags(), schedulerNode, schedulingRequest, schedulerNode,
constraintManager, tagsManager)) { constraintManager, tagsManager)) {
return true; return true;
} }

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerR
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@ -72,6 +73,7 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
private String targetNodePartition; private String targetNodePartition;
private Set<String> targetAllocationTags; private Set<String> targetAllocationTags;
private AllocationTagsManager allocationTagsManager; private AllocationTagsManager allocationTagsManager;
private PlacementConstraintManager placementConstraintManager;
public SingleConstraintAppPlacementAllocator() { public SingleConstraintAppPlacementAllocator() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@ -437,10 +439,9 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
// node type will be ignored. // node type will be ignored.
try { try {
return PlacementConstraintsUtil.canSatisfySingleConstraint( return PlacementConstraintsUtil.canSatisfyConstraints(
appSchedulingInfo.getApplicationId(), appSchedulingInfo.getApplicationId(), schedulingRequest, node,
this.schedulingRequest.getPlacementConstraint(), node, placementConstraintManager, allocationTagsManager);
allocationTagsManager);
} catch (InvalidAllocationTagsQueryException e) { } catch (InvalidAllocationTagsQueryException e) {
LOG.warn("Failed to query node cardinality:", e); LOG.warn("Failed to query node cardinality:", e);
return false; return false;
@ -527,5 +528,6 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
SchedulerRequestKey schedulerRequestKey, RMContext rmContext) { SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext); super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
this.allocationTagsManager = rmContext.getAllocationTagsManager(); this.allocationTagsManager = rmContext.getAllocationTagsManager();
this.placementConstraintManager = rmContext.getPlacementConstraintManager();
} }
} }

View File

@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -39,6 +40,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -64,6 +69,7 @@ public class TestPlacementConstraintsUtil {
private PlacementConstraint c1, c2, c3, c4; private PlacementConstraint c1, c2, c3, c4;
private Set<String> sourceTag1, sourceTag2; private Set<String> sourceTag1, sourceTag2;
private Map<Set<String>, PlacementConstraint> constraintMap1, constraintMap2; private Map<Set<String>, PlacementConstraint> constraintMap1, constraintMap2;
private AtomicLong requestID = new AtomicLong(0);
@Before @Before
public void setup() { public void setup() {
@ -102,6 +108,22 @@ public class TestPlacementConstraintsUtil {
AbstractMap.SimpleEntry::getValue)); AbstractMap.SimpleEntry::getValue));
} }
private SchedulingRequest createSchedulingRequest(Set<String> allocationTags,
PlacementConstraint constraint) {
return SchedulingRequest
.newInstance(requestID.incrementAndGet(),
Priority.newInstance(0),
ExecutionTypeRequest.newInstance(),
allocationTags,
ResourceSizing.newInstance(Resource.newInstance(1024, 3)),
constraint);
}
private SchedulingRequest createSchedulingRequest(Set<String>
allocationTags) {
return createSchedulingRequest(allocationTags, null);
}
@Test @Test
public void testNodeAffinityAssignment() public void testNodeAffinityAssignment()
throws InvalidAllocationTagsQueryException { throws InvalidAllocationTagsQueryException {
@ -117,10 +139,10 @@ public class TestPlacementConstraintsUtil {
RMNode currentNode = nodeIterator.next(); RMNode currentNode = nodeIterator.next();
FiCaSchedulerNode schedulerNode = TestUtils.getMockNode( FiCaSchedulerNode schedulerNode = TestUtils.getMockNode(
currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB); currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB);
Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode, pcm, tm)); createSchedulingRequest(sourceTag1), schedulerNode, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode, pcm, tm)); createSchedulingRequest(sourceTag2), schedulerNode, pcm, tm));
} }
/** /**
* Now place container: * Now place container:
@ -145,15 +167,15 @@ public class TestPlacementConstraintsUtil {
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m")); tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
// 'spark' placement on Node0 should now SUCCEED // 'spark' placement on Node0 should now SUCCEED
Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode0, pcm, tm)); createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
// FAIL on the rest of the nodes // FAIL on the rest of the nodes
Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode1, pcm, tm)); createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode2, pcm, tm)); createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode3, pcm, tm)); createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
} }
@Test @Test
@ -187,16 +209,16 @@ public class TestPlacementConstraintsUtil {
FiCaSchedulerNode schedulerNode3 = TestUtils FiCaSchedulerNode schedulerNode3 = TestUtils
.getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
// 'zk' placement on Rack1 should now SUCCEED // 'zk' placement on Rack1 should now SUCCEED
Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode0, pcm, tm)); createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode1, pcm, tm)); createSchedulingRequest(sourceTag2), schedulerNode1, pcm, tm));
// FAIL on the rest of the RACKs // FAIL on the rest of the RACKs
Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode2, pcm, tm)); createSchedulingRequest(sourceTag2), schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode3, pcm, tm)); createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm));
} }
@Test @Test
@ -230,15 +252,15 @@ public class TestPlacementConstraintsUtil {
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m")); tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
// 'spark' placement on Node0 should now FAIL // 'spark' placement on Node0 should now FAIL
Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode0, pcm, tm)); createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
// SUCCEED on the rest of the nodes // SUCCEED on the rest of the nodes
Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode1, pcm, tm)); createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode2, pcm, tm)); createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag1, schedulerNode3, pcm, tm)); createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
} }
@Test @Test
@ -273,15 +295,15 @@ public class TestPlacementConstraintsUtil {
.getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
// 'zk' placement on Rack1 should FAIL // 'zk' placement on Rack1 should FAIL
Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode0, pcm, tm)); createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode1, pcm, tm)); createSchedulingRequest(sourceTag2), schedulerNode1, pcm, tm));
// SUCCEED on the rest of the RACKs // SUCCEED on the rest of the RACKs
Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode2, pcm, tm)); createSchedulingRequest(sourceTag2), schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
sourceTag2, schedulerNode3, pcm, tm)); createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm));
} }
} }