From 8b200ab39576be408eb659ca66b594784aeb6d57 Mon Sep 17 00:00:00 2001 From: Carlo Curino Date: Tue, 24 Oct 2017 10:39:04 -0700 Subject: [PATCH] YARN-7339. LocalityMulticastAMRMProxyPolicy should handle cancel request properly. (Botong Huang via curino) (cherry picked from commit 1c5c2b5dde6f2cffc587ca8f79a18828e1b1faf9) --- .../LocalityMulticastAMRMProxyPolicy.java | 40 ++++++------- .../TestLocalityMulticastAMRMProxyPolicy.java | 57 +++++++++++++++++++ 2 files changed, 78 insertions(+), 19 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index da30d98ff58..d303d6ff14b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -326,10 +326,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { // any RM we have previously contacted (this might be the user way // to cancel a previous request). if (numContainer == 0) { - for (SubClusterId targetId : targetSubclusters) { - if (headroom.containsKey(targetId)) { - allocationBookkeeper.addAnyRR(targetId, originalResourceRequest); - } + for (SubClusterId targetId : headroom.keySet()) { + allocationBookkeeper.addAnyRR(targetId, originalResourceRequest); } return; } @@ -562,23 +560,27 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { Preconditions .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName())); - if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) { - countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>()); - } - if (!countContainersPerRM.get(rr.getAllocationRequestId()) - .containsKey(targetId)) { - countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId, - new AtomicLong(0)); - } - countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId) - .addAndGet(rr.getNumContainers()); + if (rr.getNumContainers() > 0) { + if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) { + countContainersPerRM.put(rr.getAllocationRequestId(), + new HashMap<>()); + } + if (!countContainersPerRM.get(rr.getAllocationRequestId()) + .containsKey(targetId)) { + countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId, + new AtomicLong(0)); + } + countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId) + .addAndGet(rr.getNumContainers()); - if (!totNumLocalizedContainers.containsKey(rr.getAllocationRequestId())) { - totNumLocalizedContainers.put(rr.getAllocationRequestId(), - new AtomicLong(0)); + if (!totNumLocalizedContainers + .containsKey(rr.getAllocationRequestId())) { + totNumLocalizedContainers.put(rr.getAllocationRequestId(), + new AtomicLong(0)); + } + totNumLocalizedContainers.get(rr.getAllocationRequestId()) + .addAndGet(rr.getNumContainers()); } - totNumLocalizedContainers.get(rr.getAllocationRequestId()) - .addAndGet(rr.getNumContainers()); internalAddToAnswer(targetId, rr); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java index 46a60115017..f66bbb6adf0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -659,4 +659,61 @@ public class TestLocalityMulticastAMRMProxyPolicy "Expect sum to be 19 in array: " + printList(allocations), 19, sum); } } + + @Test + public void testCancelWithLocalizedResource() throws YarnException { + // Configure policy to be 100% headroom based + getPolicyInfo().setHeadroomAlpha(1.0f); + + initializePolicy(); + List resourceRequests = new ArrayList<>(); + + // Initialize the headroom map + prepPolicyWithHeadroom(); + + // Cancel at ANY level only + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0-host0", 1024, 1, 1, 1, null, false)); + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0", 1024, 1, 1, 1, null, false)); + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + ResourceRequest.ANY, 1024, 1, 1, 0, null, false)); + + Map> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + checkExpectedAllocation(response, "subcluster0", 3, 1); + checkExpectedAllocation(response, "subcluster1", 1, 0); + checkExpectedAllocation(response, "subcluster2", 1, 0); + checkExpectedAllocation(response, "subcluster3", -1, -1); + checkExpectedAllocation(response, "subcluster4", -1, -1); + checkExpectedAllocation(response, "subcluster5", -1, -1); + + resourceRequests.clear(); + // Cancel at node level only + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0-host0", 1024, 1, 1, 0, null, false)); + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0", 1024, 1, 1, 0, null, false)); + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + ResourceRequest.ANY, 1024, 1, 1, 100, null, false)); + + response = ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + /* + * Since node request is a cancel, it should not be considered associated + * with localized requests. Based on headroom, we expect 75 containers to + * got to subcluster0 (60) and subcluster2 (15) according to the advertised + * headroom (40 and 10), no containers for sublcuster1 as it advertise zero + * headroom, and 25 to subcluster5 which has unknown headroom, and so it + * gets 1/4th of the load + */ + checkExpectedAllocation(response, "subcluster0", 3, 60); + checkExpectedAllocation(response, "subcluster1", 1, -1); + checkExpectedAllocation(response, "subcluster2", 1, 15); + checkExpectedAllocation(response, "subcluster5", 1, 25); + checkTotalContainerAllocation(response, 100); + } } \ No newline at end of file