YARN-7339. LocalityMulticastAMRMProxyPolicy should handle cancel request properly. (Botong Huang via curino)

This commit is contained in:
Carlo Curino 2017-10-24 10:39:04 -07:00
parent 025c656572
commit 1c5c2b5dde
2 changed files with 78 additions and 19 deletions

View File

@ -326,10 +326,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
// any RM we have previously contacted (this might be the user way
// to cancel a previous request).
if (numContainer == 0) {
for (SubClusterId targetId : targetSubclusters) {
if (headroom.containsKey(targetId)) {
allocationBookkeeper.addAnyRR(targetId, originalResourceRequest);
}
for (SubClusterId targetId : headroom.keySet()) {
allocationBookkeeper.addAnyRR(targetId, originalResourceRequest);
}
return;
}
@ -562,23 +560,27 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
Preconditions
.checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName()));
if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) {
countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>());
}
if (!countContainersPerRM.get(rr.getAllocationRequestId())
.containsKey(targetId)) {
countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId,
new AtomicLong(0));
}
countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId)
.addAndGet(rr.getNumContainers());
if (rr.getNumContainers() > 0) {
if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) {
countContainersPerRM.put(rr.getAllocationRequestId(),
new HashMap<>());
}
if (!countContainersPerRM.get(rr.getAllocationRequestId())
.containsKey(targetId)) {
countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId,
new AtomicLong(0));
}
countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId)
.addAndGet(rr.getNumContainers());
if (!totNumLocalizedContainers.containsKey(rr.getAllocationRequestId())) {
totNumLocalizedContainers.put(rr.getAllocationRequestId(),
new AtomicLong(0));
if (!totNumLocalizedContainers
.containsKey(rr.getAllocationRequestId())) {
totNumLocalizedContainers.put(rr.getAllocationRequestId(),
new AtomicLong(0));
}
totNumLocalizedContainers.get(rr.getAllocationRequestId())
.addAndGet(rr.getNumContainers());
}
totNumLocalizedContainers.get(rr.getAllocationRequestId())
.addAndGet(rr.getNumContainers());
internalAddToAnswer(targetId, rr);
}

View File

@ -659,4 +659,61 @@ public class TestLocalityMulticastAMRMProxyPolicy
"Expect sum to be 19 in array: " + printList(allocations), 19, sum);
}
}
@Test
public void testCancelWithLocalizedResource() throws YarnException {
// Configure policy to be 100% headroom based
getPolicyInfo().setHeadroomAlpha(1.0f);
initializePolicy();
List<ResourceRequest> resourceRequests = new ArrayList<>();
// Initialize the headroom map
prepPolicyWithHeadroom();
// Cancel at ANY level only
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
"subcluster0-rack0-host0", 1024, 1, 1, 1, null, false));
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
"subcluster0-rack0", 1024, 1, 1, 1, null, false));
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
ResourceRequest.ANY, 1024, 1, 1, 0, null, false));
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
checkExpectedAllocation(response, "subcluster0", 3, 1);
checkExpectedAllocation(response, "subcluster1", 1, 0);
checkExpectedAllocation(response, "subcluster2", 1, 0);
checkExpectedAllocation(response, "subcluster3", -1, -1);
checkExpectedAllocation(response, "subcluster4", -1, -1);
checkExpectedAllocation(response, "subcluster5", -1, -1);
resourceRequests.clear();
// Cancel at node level only
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
"subcluster0-rack0-host0", 1024, 1, 1, 0, null, false));
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
"subcluster0-rack0", 1024, 1, 1, 0, null, false));
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
ResourceRequest.ANY, 1024, 1, 1, 100, null, false));
response = ((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
/*
* Since node request is a cancel, it should not be considered associated
* with localized requests. Based on headroom, we expect 75 containers to
* got to subcluster0 (60) and subcluster2 (15) according to the advertised
* headroom (40 and 10), no containers for sublcuster1 as it advertise zero
* headroom, and 25 to subcluster5 which has unknown headroom, and so it
* gets 1/4th of the load
*/
checkExpectedAllocation(response, "subcluster0", 3, 60);
checkExpectedAllocation(response, "subcluster1", 1, -1);
checkExpectedAllocation(response, "subcluster2", 1, 15);
checkExpectedAllocation(response, "subcluster5", 1, 25);
checkTotalContainerAllocation(response, 100);
}
}