diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 238aa120eaf..549fa3485a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -558,6 +558,9 @@ public abstract class AbstractCSQueue implements CSQueue { } // ResourceUsage has its own lock, no addition lock needs here. queueUsage.incUsed(nodeLabel, resourceToInc); + CSQueueUtils.updateUsedCapacity(resourceCalculator, + labelManager.getResourceByLabel(nodeLabel, Resources.none()), + minimumAllocation, queueUsage, queueCapacities, nodeLabel); if (null != parent) { parent.incUsedResource(nodeLabel, resourceToInc, null); } @@ -571,6 +574,9 @@ public abstract class AbstractCSQueue implements CSQueue { } // ResourceUsage has its own lock, no addition lock needs here. queueUsage.decUsed(nodeLabel, resourceToDec); + CSQueueUtils.updateUsedCapacity(resourceCalculator, + labelManager.getResourceByLabel(nodeLabel, Resources.none()), + minimumAllocation, queueUsage, queueCapacities, nodeLabel); if (null != parent) { parent.decUsedResource(nodeLabel, resourceToDec, null); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 2f981a74823..c402784ac00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -180,7 +180,7 @@ class CSQueueUtils { * Update partitioned resource usage, if nodePartition == null, will update * used resource for all partitions of this queue. */ - private static void updateUsedCapacity(final ResourceCalculator rc, + public static void updateUsedCapacity(final ResourceCalculator rc, final Resource totalPartitionResource, final Resource minimumAllocation, ResourceUsage queueResourceUsage, QueueCapacities queueCapacities, String nodePartition) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index fe24b2d736a..cff79cd5c1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -100,6 +100,12 @@ public class TestCapacitySchedulerNodeLabelUpdate { checkAMUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL); } + private void checkUsedCapacity(MockRM rm, String queueName, int capacity, + int total) { + checkUsedCapacity(rm, queueName, capacity, total, + RMNodeLabelsManager.NO_LABEL); + } + private void checkUsedResource(MockRM rm, String queueName, int memory, String label) { CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); @@ -108,6 +114,15 @@ public class TestCapacitySchedulerNodeLabelUpdate { .getMemory()); } + private void checkUsedCapacity(MockRM rm, String queueName, int capacity, + int total, String label) { + float epsillon = 0.0001f; + CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = scheduler.getQueue(queueName); + Assert.assertEquals((float)capacity/total, + queue.getQueueCapacities().getUsedCapacity(label), epsillon); + } + private void checkAMUsedResource(MockRM rm, String queueName, int memory, String label) { CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); @@ -188,7 +203,7 @@ public class TestCapacitySchedulerNodeLabelUpdate { rm.stop(); } - @Test (timeout = 60000) + @Test public void testResourceUsageWhenNodeUpdatesPartition() throws Exception { // set node -> label @@ -233,16 +248,23 @@ public class TestCapacitySchedulerNodeLabelUpdate { // queue-a used x=1G, ""=1G checkUsedResource(rm, "a", 1024, "x"); checkUsedResource(rm, "a", 1024); + checkUsedCapacity(rm, "a", 1024, 8000, "x"); + checkUsedCapacity(rm, "a", 1024, 8000); CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId()); // change h1's label to z + mgr.replaceLabelsOnNode(ImmutableMap.of(nm1.getNodeId(), toSet("z"))); cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(), toSet("z")))); + Thread.sleep(100); checkUsedResource(rm, "a", 0, "x"); checkUsedResource(rm, "a", 1024, "z"); checkUsedResource(rm, "a", 1024); + checkUsedCapacity(rm, "a", 0, 8000, "x"); + checkUsedCapacity(rm, "a", 1024, 8000, "z"); + checkUsedCapacity(rm, "a", 1024, 8000); checkUsedResource(rm, "root", 0, "x"); checkUsedResource(rm, "root", 1024, "z"); checkUsedResource(rm, "root", 1024); @@ -254,12 +276,18 @@ public class TestCapacitySchedulerNodeLabelUpdate { app.getAppAttemptResourceUsage().getUsed("z").getMemory()); // change h1's label to y + mgr.replaceLabelsOnNode(ImmutableMap.of(nm1.getNodeId(), toSet("y"))); cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(), toSet("y")))); + Thread.sleep(100); checkUsedResource(rm, "a", 0, "x"); checkUsedResource(rm, "a", 1024, "y"); checkUsedResource(rm, "a", 0, "z"); checkUsedResource(rm, "a", 1024); + checkUsedCapacity(rm, "a", 0, 8000, "x"); + checkUsedCapacity(rm, "a", 1024, 16000, "y"); + checkUsedCapacity(rm, "a", 0, 8000, "z"); + checkUsedCapacity(rm, "a", 1024, 8000); checkUsedResource(rm, "root", 0, "x"); checkUsedResource(rm, "root", 1024, "y"); checkUsedResource(rm, "root", 0, "z"); @@ -278,11 +306,17 @@ public class TestCapacitySchedulerNodeLabelUpdate { Set emptyLabels = new HashSet<>(); Map> map = ImmutableMap.of(nm1.getNodeId(), emptyLabels); + mgr.replaceLabelsOnNode(map); cs.handle(new NodeLabelsUpdateSchedulerEvent(map)); + Thread.sleep(100); checkUsedResource(rm, "a", 0, "x"); checkUsedResource(rm, "a", 0, "y"); checkUsedResource(rm, "a", 0, "z"); checkUsedResource(rm, "a", 2048); + checkUsedCapacity(rm, "a", 0, 8000, "x"); + checkUsedCapacity(rm, "a", 0, 8000, "y"); + checkUsedCapacity(rm, "a", 0, 8000, "z"); + checkUsedCapacity(rm, "a", 2048, 16000); checkUsedResource(rm, "root", 0, "x"); checkUsedResource(rm, "root", 0, "y"); checkUsedResource(rm, "root", 0, "z"); @@ -314,6 +348,10 @@ public class TestCapacitySchedulerNodeLabelUpdate { checkUsedResource(rm, "a", 0, "y"); checkUsedResource(rm, "a", 0, "z"); checkUsedResource(rm, "a", 0); + checkUsedCapacity(rm, "a", 0, 8000, "x"); + checkUsedCapacity(rm, "a", 0, 8000, "y"); + checkUsedCapacity(rm, "a", 0, 8000, "z"); + checkUsedCapacity(rm, "a", 0, 16000); checkUsedResource(rm, "root", 0, "x"); checkUsedResource(rm, "root", 0, "y"); checkUsedResource(rm, "root", 0, "z");