From fa86fdc3cc3cc9c4d4bfabcd6dfadcf4635dc92e Mon Sep 17 00:00:00 2001 From: Carlo Curino Date: Wed, 25 Oct 2017 13:56:35 -0700 Subject: [PATCH] YARN-7339. LocalityMulticastAMRMProxyPolicy should handle cancel request properly. (Botong Huang via curino) Edited cherry-pick from 1c5c2b5dde6f2cffc587ca8f79a18828e1b1faf9 --- .../LocalityMulticastAMRMProxyPolicy.java | 41 ++++++------- .../TestLocalityMulticastAMRMProxyPolicy.java | 57 +++++++++++++++++++ 2 files changed, 78 insertions(+), 20 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index e07c7c2d038..06c9eded2ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -326,10 +326,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { // any RM we have previously contacted (this might be the user way // to cancel a previous request). if (numContainer == 0) { - for (SubClusterId targetId : targetSubclusters) { - if (headroom.containsKey(targetId)) { - allocationBookkeeper.addAnyRR(targetId, originalResourceRequest); - } + for (SubClusterId targetId : headroom.keySet()) { + allocationBookkeeper.addAnyRR(targetId, originalResourceRequest); } return; } @@ -562,24 +560,27 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { Preconditions .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName())); - if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) { - countContainersPerRM.put(rr.getAllocationRequestId(), - new HashMap()); - } - if (!countContainersPerRM.get(rr.getAllocationRequestId()) - .containsKey(targetId)) { - countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId, - new AtomicLong(0)); - } - countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId) - .addAndGet(rr.getNumContainers()); + if (rr.getNumContainers() > 0) { + if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) { + countContainersPerRM.put(rr.getAllocationRequestId(), + new HashMap()); + } + if (!countContainersPerRM.get(rr.getAllocationRequestId()) + .containsKey(targetId)) { + countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId, + new AtomicLong(0)); + } + countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId) + .addAndGet(rr.getNumContainers()); - if (!totNumLocalizedContainers.containsKey(rr.getAllocationRequestId())) { - totNumLocalizedContainers.put(rr.getAllocationRequestId(), - new AtomicLong(0)); + if (!totNumLocalizedContainers + .containsKey(rr.getAllocationRequestId())) { + totNumLocalizedContainers.put(rr.getAllocationRequestId(), + new AtomicLong(0)); + } + totNumLocalizedContainers.get(rr.getAllocationRequestId()) + .addAndGet(rr.getNumContainers()); } - totNumLocalizedContainers.get(rr.getAllocationRequestId()) - .addAndGet(rr.getNumContainers()); internalAddToAnswer(targetId, rr); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java index 46a60115017..f66bbb6adf0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -659,4 +659,61 @@ public class TestLocalityMulticastAMRMProxyPolicy "Expect sum to be 19 in array: " + printList(allocations), 19, sum); } } + + @Test + public void testCancelWithLocalizedResource() throws YarnException { + // Configure policy to be 100% headroom based + getPolicyInfo().setHeadroomAlpha(1.0f); + + initializePolicy(); + List resourceRequests = new ArrayList<>(); + + // Initialize the headroom map + prepPolicyWithHeadroom(); + + // Cancel at ANY level only + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0-host0", 1024, 1, 1, 1, null, false)); + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0", 1024, 1, 1, 1, null, false)); + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + ResourceRequest.ANY, 1024, 1, 1, 0, null, false)); + + Map> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + checkExpectedAllocation(response, "subcluster0", 3, 1); + checkExpectedAllocation(response, "subcluster1", 1, 0); + checkExpectedAllocation(response, "subcluster2", 1, 0); + checkExpectedAllocation(response, "subcluster3", -1, -1); + checkExpectedAllocation(response, "subcluster4", -1, -1); + checkExpectedAllocation(response, "subcluster5", -1, -1); + + resourceRequests.clear(); + // Cancel at node level only + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0-host0", 1024, 1, 1, 0, null, false)); + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0", 1024, 1, 1, 0, null, false)); + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + ResourceRequest.ANY, 1024, 1, 1, 100, null, false)); + + response = ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + /* + * Since node request is a cancel, it should not be considered associated + * with localized requests. Based on headroom, we expect 75 containers to + * got to subcluster0 (60) and subcluster2 (15) according to the advertised + * headroom (40 and 10), no containers for sublcuster1 as it advertise zero + * headroom, and 25 to subcluster5 which has unknown headroom, and so it + * gets 1/4th of the load + */ + checkExpectedAllocation(response, "subcluster0", 3, 60); + checkExpectedAllocation(response, "subcluster1", 1, -1); + checkExpectedAllocation(response, "subcluster2", 1, 15); + checkExpectedAllocation(response, "subcluster5", 1, 25); + checkTotalContainerAllocation(response, 100); + } } \ No newline at end of file