YARN-7339. LocalityMulticastAMRMProxyPolicy should handle cancel request properly. (Botong Huang via curino)
Edited cherry-pick from 1c5c2b5dde
This commit is contained in:
parent
736fb3b66c
commit
fa86fdc3cc
|
@ -326,10 +326,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|||
// any RM we have previously contacted (this might be the user way
|
||||
// to cancel a previous request).
|
||||
if (numContainer == 0) {
|
||||
for (SubClusterId targetId : targetSubclusters) {
|
||||
if (headroom.containsKey(targetId)) {
|
||||
allocationBookkeeper.addAnyRR(targetId, originalResourceRequest);
|
||||
}
|
||||
for (SubClusterId targetId : headroom.keySet()) {
|
||||
allocationBookkeeper.addAnyRR(targetId, originalResourceRequest);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -562,24 +560,27 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|||
Preconditions
|
||||
.checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName()));
|
||||
|
||||
if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) {
|
||||
countContainersPerRM.put(rr.getAllocationRequestId(),
|
||||
new HashMap<SubClusterId, AtomicLong>());
|
||||
}
|
||||
if (!countContainersPerRM.get(rr.getAllocationRequestId())
|
||||
.containsKey(targetId)) {
|
||||
countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId,
|
||||
new AtomicLong(0));
|
||||
}
|
||||
countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId)
|
||||
.addAndGet(rr.getNumContainers());
|
||||
if (rr.getNumContainers() > 0) {
|
||||
if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) {
|
||||
countContainersPerRM.put(rr.getAllocationRequestId(),
|
||||
new HashMap<SubClusterId, AtomicLong>());
|
||||
}
|
||||
if (!countContainersPerRM.get(rr.getAllocationRequestId())
|
||||
.containsKey(targetId)) {
|
||||
countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId,
|
||||
new AtomicLong(0));
|
||||
}
|
||||
countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId)
|
||||
.addAndGet(rr.getNumContainers());
|
||||
|
||||
if (!totNumLocalizedContainers.containsKey(rr.getAllocationRequestId())) {
|
||||
totNumLocalizedContainers.put(rr.getAllocationRequestId(),
|
||||
new AtomicLong(0));
|
||||
if (!totNumLocalizedContainers
|
||||
.containsKey(rr.getAllocationRequestId())) {
|
||||
totNumLocalizedContainers.put(rr.getAllocationRequestId(),
|
||||
new AtomicLong(0));
|
||||
}
|
||||
totNumLocalizedContainers.get(rr.getAllocationRequestId())
|
||||
.addAndGet(rr.getNumContainers());
|
||||
}
|
||||
totNumLocalizedContainers.get(rr.getAllocationRequestId())
|
||||
.addAndGet(rr.getNumContainers());
|
||||
|
||||
internalAddToAnswer(targetId, rr);
|
||||
}
|
||||
|
|
|
@ -659,4 +659,61 @@ public class TestLocalityMulticastAMRMProxyPolicy
|
|||
"Expect sum to be 19 in array: " + printList(allocations), 19, sum);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCancelWithLocalizedResource() throws YarnException {
|
||||
// Configure policy to be 100% headroom based
|
||||
getPolicyInfo().setHeadroomAlpha(1.0f);
|
||||
|
||||
initializePolicy();
|
||||
List<ResourceRequest> resourceRequests = new ArrayList<>();
|
||||
|
||||
// Initialize the headroom map
|
||||
prepPolicyWithHeadroom();
|
||||
|
||||
// Cancel at ANY level only
|
||||
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
|
||||
"subcluster0-rack0-host0", 1024, 1, 1, 1, null, false));
|
||||
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
|
||||
"subcluster0-rack0", 1024, 1, 1, 1, null, false));
|
||||
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
|
||||
ResourceRequest.ANY, 1024, 1, 1, 0, null, false));
|
||||
|
||||
Map<SubClusterId, List<ResourceRequest>> response =
|
||||
((FederationAMRMProxyPolicy) getPolicy())
|
||||
.splitResourceRequests(resourceRequests);
|
||||
|
||||
checkExpectedAllocation(response, "subcluster0", 3, 1);
|
||||
checkExpectedAllocation(response, "subcluster1", 1, 0);
|
||||
checkExpectedAllocation(response, "subcluster2", 1, 0);
|
||||
checkExpectedAllocation(response, "subcluster3", -1, -1);
|
||||
checkExpectedAllocation(response, "subcluster4", -1, -1);
|
||||
checkExpectedAllocation(response, "subcluster5", -1, -1);
|
||||
|
||||
resourceRequests.clear();
|
||||
// Cancel at node level only
|
||||
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
|
||||
"subcluster0-rack0-host0", 1024, 1, 1, 0, null, false));
|
||||
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
|
||||
"subcluster0-rack0", 1024, 1, 1, 0, null, false));
|
||||
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
|
||||
ResourceRequest.ANY, 1024, 1, 1, 100, null, false));
|
||||
|
||||
response = ((FederationAMRMProxyPolicy) getPolicy())
|
||||
.splitResourceRequests(resourceRequests);
|
||||
|
||||
/*
|
||||
* Since node request is a cancel, it should not be considered associated
|
||||
* with localized requests. Based on headroom, we expect 75 containers to
|
||||
* got to subcluster0 (60) and subcluster2 (15) according to the advertised
|
||||
* headroom (40 and 10), no containers for sublcuster1 as it advertise zero
|
||||
* headroom, and 25 to subcluster5 which has unknown headroom, and so it
|
||||
* gets 1/4th of the load
|
||||
*/
|
||||
checkExpectedAllocation(response, "subcluster0", 3, 60);
|
||||
checkExpectedAllocation(response, "subcluster1", 1, -1);
|
||||
checkExpectedAllocation(response, "subcluster2", 1, 15);
|
||||
checkExpectedAllocation(response, "subcluster5", 1, 25);
|
||||
checkTotalContainerAllocation(response, 100);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue