YARN-7783. Add validation step to ensure constraints are not violated due to order in which a request is processed. (asuresh)

This commit is contained in:
Arun Suresh 2018-01-23 08:15:58 -08:00
parent 9b81cb0537
commit a4c539fcdb
2 changed files with 155 additions and 13 deletions

View File

@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
@ -69,13 +70,9 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
public boolean attemptPlacementOnNode(ApplicationId appId,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode)
throws InvalidAllocationTagsQueryException {
int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
if (numAllocs > 0) {
if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
schedulingRequest, schedulerNode,
constraintManager, tagsManager)) {
return true;
}
if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
schedulingRequest, schedulerNode, constraintManager, tagsManager)) {
return true;
}
return false;
}
@ -93,6 +90,9 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
int rePlacementCount = RE_ATTEMPT_COUNT;
while (rePlacementCount > 0) {
doPlacement(requests, resp, allNodes, rejectedRequests);
// Double check if placement constraints are really satisfied
validatePlacement(requests.getApplicationId(), resp,
rejectedRequests);
if (rejectedRequests.size() == 0 || rePlacementCount == 1) {
break;
}
@ -122,9 +122,14 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
break;
}
SchedulingRequest schedulingRequest = requestIterator.next();
PlacedSchedulingRequest placedReq =
new PlacedSchedulingRequest(schedulingRequest);
placedReq.setPlacementAttempt(requests.getPlacementAttempt());
resp.getPlacedRequests().add(placedReq);
CircularIterator<SchedulerNode> nodeIter =
new CircularIterator(lastSatisfiedNode, nIter, allNodes);
int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
int numAllocs =
schedulingRequest.getResourceSizing().getNumAllocations();
while (nodeIter.hasNext() && numAllocs > 0) {
SchedulerNode node = nodeIter.next();
try {
@ -135,11 +140,7 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
requests.getApplicationId(), schedulingRequest, node)) {
schedulingRequest.getResourceSizing()
.setNumAllocations(--numAllocs);
PlacedSchedulingRequest placedReq =
new PlacedSchedulingRequest(schedulingRequest);
placedReq.setPlacementAttempt(requests.getPlacementAttempt());
placedReq.getNodes().add(node);
resp.getPlacedRequests().add(placedReq);
numAllocs =
schedulingRequest.getResourceSizing().getNumAllocations();
// Add temp-container tags for current placement cycle
@ -156,6 +157,98 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
// Add all requests whose numAllocations still > 0 to rejected list.
requests.getSchedulingRequests().stream()
.filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0)
.forEach(rejReq -> rejectedRequests.add(rejReq));
.forEach(rejReq -> rejectedRequests.add(cloneReq(rejReq)));
}
/**
* During the placement phase, allocation tags are added to the node if the
* constraint is satisfied, But depending on the order in which the
* algorithm sees the request, it is possible that a constraint that happened
* to be valid during placement of an earlier-seen request, might not be
* valid after all subsequent requests have been placed.
*
* For eg:
* Assume nodes n1, n2, n3, n4 and n5
*
* Consider the 2 constraints:
* 1) "foo", anti-affinity with "foo"
* 2) "bar", anti-affinity with "foo"
*
* And 2 requests
* req1: NumAllocations = 4, allocTags = [foo]
* req2: NumAllocations = 1, allocTags = [bar]
*
* If "req1" is seen first, the algorithm can place the 4 containers in
* n1, n2, n3 and n4. And when it gets to "req2", it will see that 4 nodes
* with the "foo" tag and will place on n5.
* But if "req2" is seem first, then "bar" will be placed on any node,
* since no node currently has "foo", and when it gets to "req1", since
* "foo" has not anti-affinity with "bar", the algorithm can end up placing
* "foo" on a node with "bar" violating the second constraint.
*
* To prevent the above, we need a validation step: after the placements for a
* batch of requests are made, for each req, we remove its tags from the node
* and try to see of constraints are still satisfied if the tag were to be
* added back on the node.
*
* When applied to the example above, after "req2" and "req1" are placed,
* we remove the "bar" tag from the node and try to add it back on the node.
* This time, constraint satisfaction will fail, since there is now a "foo"
* tag on the node and "bar" cannot be added. The algorithm will then
* retry placing "req2" on another node.
*
* @param applicationId
* @param resp
* @param rejectedRequests
*/
private void validatePlacement(ApplicationId applicationId,
ConstraintPlacementAlgorithmOutput resp,
List<SchedulingRequest> rejectedRequests) {
Iterator<PlacedSchedulingRequest> pReqIter =
resp.getPlacedRequests().iterator();
while (pReqIter.hasNext()) {
PlacedSchedulingRequest pReq = pReqIter.next();
Iterator<SchedulerNode> nodeIter = pReq.getNodes().iterator();
// Assuming all reqs were satisfied.
int num = 0;
while (nodeIter.hasNext()) {
SchedulerNode node = nodeIter.next();
try {
// Remove just the tags for this placement.
this.tagsManager.removeTempTags(node.getNodeID(),
applicationId, pReq.getSchedulingRequest().getAllocationTags());
if (!attemptPlacementOnNode(
applicationId, pReq.getSchedulingRequest(), node)) {
nodeIter.remove();
num++;
} else {
// Add back the tags if everything is fine.
this.tagsManager.addTempTags(node.getNodeID(),
applicationId, pReq.getSchedulingRequest().getAllocationTags());
}
} catch (InvalidAllocationTagsQueryException e) {
LOG.warn("Got exception from TagManager !", e);
}
}
if (num > 0) {
SchedulingRequest sReq = cloneReq(pReq.getSchedulingRequest());
sReq.getResourceSizing().setNumAllocations(num);
rejectedRequests.add(sReq);
}
if (pReq.getNodes().isEmpty()) {
pReqIter.remove();
}
}
}
private static SchedulingRequest cloneReq(SchedulingRequest sReq) {
return SchedulingRequest.newInstance(
sReq.getAllocationRequestId(), sReq.getPriority(),
sReq.getExecutionType(), sReq.getAllocationTags(),
ResourceSizing.newInstance(
sReq.getResourceSizing().getNumAllocations(),
sReq.getResourceSizing().getResources()),
sReq.getPlacementConstraint());
}
}

View File

@ -150,6 +150,55 @@ public class TestPlacementProcessor {
Assert.assertEquals(4, nodeIds.size());
}
@Test(timeout = 300000)
public void testMutualAntiAffinityPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
MockNM nm5 = new MockNM("h5:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm5.getNodeId(), nm5);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
nm5.registerNode();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
// Containers with allocationTag 'foo' are restricted to 1 per NODE
Map<Set<String>, PlacementConstraint> pcMap = new HashMap<>();
pcMap.put(Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
pcMap.put(Collections.singleton("bar"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, pcMap);
am1.addSchedulingRequest(
Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
schedulingRequest(1, 4, 1, 512, "foo"),
schedulingRequest(1, 5, 1, 512, "foo")));
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5);
Assert.assertEquals(5, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
.collect(Collectors.toSet());
// Ensure unique nodes (antiaffinity)
Assert.assertEquals(5, nodeIds.size());
}
@Test(timeout = 300000)
public void testCardinalityPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();