From 855e0f8b0064db0fa90d616216230e523c84d39a Mon Sep 17 00:00:00 2001 From: Varun Vasudev Date: Tue, 1 Sep 2015 14:19:11 +0530 Subject: [PATCH] YARN-4082. Container shouldn't be killed when node's label updated. Contributed by Wangda Tan. (cherry picked from commit bf669b6d9f8ba165e30b8823218d625a49958925) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/AbstractCSQueue.java | 27 ++ .../scheduler/capacity/CSQueue.java | 26 ++ .../scheduler/capacity/CapacityScheduler.java | 40 +-- .../scheduler/capacity/LeafQueue.java | 16 ++ .../common/fica/FiCaSchedulerApp.java | 9 + .../TestCapacitySchedulerNodeLabelUpdate.java | 255 +++++++++++++++--- 7 files changed, 317 insertions(+), 59 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 837d3cbe9e4..c19bd7b16af 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -752,6 +752,9 @@ Release 2.8.0 - UNRELEASED YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id has not been reset synchronously. (Jun Gong via rohithsharmaks) + YARN-4082. Container shouldn't be killed when node's label updated. + (Wangda Tan via vvasudev) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 792c25c332b..0ae4d1a85eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -543,6 +544,32 @@ public abstract class AbstractCSQueue implements CSQueue { } } + @Override + public void incUsedResource(String nodeLabel, Resource resourceToInc, + SchedulerApplicationAttempt application) { + if (nodeLabel == null) { + nodeLabel = RMNodeLabelsManager.NO_LABEL; + } + // ResourceUsage has its own lock, no addition lock needs here. + queueUsage.incUsed(nodeLabel, resourceToInc); + if (null != parent) { + parent.incUsedResource(nodeLabel, resourceToInc, null); + } + } + + @Override + public void decUsedResource(String nodeLabel, Resource resourceToDec, + SchedulerApplicationAttempt application) { + if (nodeLabel == null) { + nodeLabel = RMNodeLabelsManager.NO_LABEL; + } + // ResourceUsage has its own lock, no addition lock needs here. + queueUsage.decUsed(nodeLabel, resourceToDec); + if (null != parent) { + parent.decUsedResource(nodeLabel, resourceToDec, null); + } + } + /** * Return if the queue has pending resource on given nodePartition and * schedulingMode. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index b06a646cec9..9855dd4882a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -287,4 +288,29 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { * @return resourceUsage */ public ResourceUsage getQueueResourceUsage(); + + /** + * When partition of node updated, we will update queue's resource usage if it + * has container(s) running on that. + */ + public void incUsedResource(String nodePartition, Resource resourceToInc, + SchedulerApplicationAttempt application); + + /** + * When partition of node updated, we will update queue's resource usage if it + * has container(s) running on that. + */ + public void decUsedResource(String nodePartition, Resource resourceToDec, + SchedulerApplicationAttempt application); + + /** + * When an outstanding resource is fulfilled or canceled, calling this will + * decrease pending resource in a queue. + * + * @param nodeLabel + * asked by application + * @param resourceToDec + * new resource asked + */ + public void decPendingResource(String nodeLabel, Resource resourceToDec); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index cff1fe5515b..b5ccbd900c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1040,12 +1040,6 @@ public class CapacityScheduler extends /** * Process node labels update on a node. - * - * TODO: Currently capacity scheduler will kill containers on a node when - * labels on the node changed. It is a simply solution to ensure guaranteed - * capacity on labels of queues. When YARN-2498 completed, we can let - * preemption policy to decide if such containers need to be killed or just - * keep them running. */ private synchronized void updateLabelsOnNode(NodeId nodeId, Set newLabels) { @@ -1060,17 +1054,31 @@ public class CapacityScheduler extends return; } - // Kill running containers since label is changed + // Get new partition, we have only one partition per node + String newPartition; + if (newLabels.isEmpty()) { + newPartition = RMNodeLabelsManager.NO_LABEL; + } else { + newPartition = newLabels.iterator().next(); + } + + // old partition as well + String oldPartition = node.getPartition(); + + // Update resources of these containers for (RMContainer rmContainer : node.getRunningContainers()) { - ContainerId containerId = rmContainer.getContainerId(); - completedContainer(rmContainer, - ContainerStatus.newInstance(containerId, - ContainerState.COMPLETE, - String.format( - "Container=%s killed since labels on the node=%s changed", - containerId.toString(), nodeId.toString()), - ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL); + FiCaSchedulerApp application = + getApplicationAttempt(rmContainer.getApplicationAttemptId()); + if (null != application) { + application.nodePartitionUpdated(rmContainer, oldPartition, + newPartition); + } else { + LOG.warn("There's something wrong, some RMContainers running on" + + " a node, but we cannot find SchedulerApplicationAttempt for it. Node=" + + node.getNodeID() + " applicationAttemptId=" + + rmContainer.getApplicationAttemptId()); + continue; + } } // Unreserve container on this node diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ff1baff2ee1..658eae10f82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1262,6 +1262,22 @@ public class LeafQueue extends AbstractCSQueue { } } + @Override + public void incUsedResource(String nodeLabel, Resource resourceToInc, + SchedulerApplicationAttempt application) { + getUser(application.getUser()).getResourceUsage().incUsed(nodeLabel, + resourceToInc); + super.incUsedResource(nodeLabel, resourceToInc, application); + } + + @Override + public void decUsedResource(String nodeLabel, Resource resourceToDec, + SchedulerApplicationAttempt application) { + getUser(application.getUser()).getResourceUsage().decUsed(nodeLabel, + resourceToDec); + super.decUsedResource(nodeLabel, resourceToDec, application); + } + @VisibleForTesting public static class User { ResourceUsage userResourceUsage = new ResourceUsage(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 74d77f59b18..300cba97600 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -443,4 +443,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { schedulingMode, currentResourceLimits, reservedContainer); } } + + public void nodePartitionUpdated(RMContainer rmContainer, String oldPartition, + String newPartition) { + Resource containerResource = rmContainer.getAllocatedResource(); + this.attemptResourceUsage.decUsed(oldPartition, containerResource); + this.attemptResourceUsage.incUsed(newPartition, containerResource); + getCSLeafQueue().decUsedResource(oldPartition, containerResource, this); + getCSLeafQueue().incUsedResource(newPartition, containerResource, this); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index 0a701d8c9da..94af4e0bffe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -19,22 +19,29 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.util.ArrayList; +import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; import org.junit.Assert; import org.junit.Before; @@ -97,8 +104,18 @@ public class TestCapacitySchedulerNodeLabelUpdate { .getMemory()); } + private void checkUserUsedResource(MockRM rm, String queueName, + String userName, String partition, int memory) { + CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName); + LeafQueue.User user = queue.getUser(userName); + Assert.assertEquals(memory, + user.getResourceUsage().getUsed(partition).getMemory()); + } + @Test(timeout = 60000) - public void testResourceUsage() throws Exception { + public void testRequestContainerAfterNodePartitionUpdated() + throws Exception { // set node -> label mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z")); @@ -160,7 +177,8 @@ public class TestCapacitySchedulerNodeLabelUpdate { } @Test (timeout = 60000) - public void testNodeUpdate() throws Exception { + public void testResourceUsageWhenNodeUpdatesPartition() + throws Exception { // set node -> label mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z")); @@ -183,8 +201,9 @@ public class TestCapacitySchedulerNodeLabelUpdate { MockNM nm1 = rm.registerNode("h1:1234", 8000); MockNM nm2 = rm.registerNode("h2:1234", 8000); MockNM nm3 = rm.registerNode("h3:1234", 8000); - - ContainerId containerId; + + ContainerId containerId1; + ContainerId containerId2; // launch an app to queue a1 (label = x), and check all container will // be allocated in h1 @@ -193,9 +212,9 @@ public class TestCapacitySchedulerNodeLabelUpdate { // request a container. am1.allocate("*", GB, 1, new ArrayList(), "x"); - containerId = - ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); - Assert.assertTrue(rm.waitForState(nm1, containerId, + containerId1 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertTrue(rm.waitForState(nm1, containerId2, RMContainerState.ALLOCATED, 10 * 1000)); // check used resource: @@ -203,55 +222,205 @@ public class TestCapacitySchedulerNodeLabelUpdate { checkUsedResource(rm, "a", 1024, "x"); checkUsedResource(rm, "a", 1024); - // change h1's label to z, container should be killed - mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h1", 0), - toSet("z"))); - Assert.assertTrue(rm.waitForState(nm1, containerId, - RMContainerState.KILLED, 10 * 1000)); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId()); - // check used resource: - // queue-a used x=0G, ""=1G ("" not changed) + // change h1's label to z + cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(), + toSet("z")))); checkUsedResource(rm, "a", 0, "x"); + checkUsedResource(rm, "a", 1024, "z"); checkUsedResource(rm, "a", 1024); + checkUsedResource(rm, "root", 0, "x"); + checkUsedResource(rm, "root", 1024, "z"); + checkUsedResource(rm, "root", 1024); + checkUserUsedResource(rm, "a", "user", "x", 0); + checkUserUsedResource(rm, "a", "user", "z", 1024); + Assert.assertEquals(0, + app.getAppAttemptResourceUsage().getUsed("x").getMemory()); + Assert.assertEquals(1024, + app.getAppAttemptResourceUsage().getUsed("z").getMemory()); - // request a container with label = y - am1.allocate("*", GB, 1, new ArrayList(), "y"); + // change h1's label to y + cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(), + toSet("y")))); + checkUsedResource(rm, "a", 0, "x"); + checkUsedResource(rm, "a", 1024, "y"); + checkUsedResource(rm, "a", 0, "z"); + checkUsedResource(rm, "a", 1024); + checkUsedResource(rm, "root", 0, "x"); + checkUsedResource(rm, "root", 1024, "y"); + checkUsedResource(rm, "root", 0, "z"); + checkUsedResource(rm, "root", 1024); + checkUserUsedResource(rm, "a", "user", "x", 0); + checkUserUsedResource(rm, "a", "user", "y", 1024); + checkUserUsedResource(rm, "a", "user", "z", 0); + Assert.assertEquals(0, + app.getAppAttemptResourceUsage().getUsed("x").getMemory()); + Assert.assertEquals(1024, + app.getAppAttemptResourceUsage().getUsed("y").getMemory()); + Assert.assertEquals(0, + app.getAppAttemptResourceUsage().getUsed("z").getMemory()); + + // change h1's label to no label + Set emptyLabels = new HashSet<>(); + Map> map = ImmutableMap.of(nm1.getNodeId(), + emptyLabels); + cs.handle(new NodeLabelsUpdateSchedulerEvent(map)); + checkUsedResource(rm, "a", 0, "x"); + checkUsedResource(rm, "a", 0, "y"); + checkUsedResource(rm, "a", 0, "z"); + checkUsedResource(rm, "a", 2048); + checkUsedResource(rm, "root", 0, "x"); + checkUsedResource(rm, "root", 0, "y"); + checkUsedResource(rm, "root", 0, "z"); + checkUsedResource(rm, "root", 2048); + checkUserUsedResource(rm, "a", "user", "x", 0); + checkUserUsedResource(rm, "a", "user", "y", 0); + checkUserUsedResource(rm, "a", "user", "z", 0); + checkUserUsedResource(rm, "a", "user", "", 2048); + Assert.assertEquals(0, + app.getAppAttemptResourceUsage().getUsed("x").getMemory()); + Assert.assertEquals(0, + app.getAppAttemptResourceUsage().getUsed("y").getMemory()); + Assert.assertEquals(0, + app.getAppAttemptResourceUsage().getUsed("z").getMemory()); + Assert.assertEquals(2048, + app.getAppAttemptResourceUsage().getUsed("").getMemory()); + + // Finish the two containers, we should see used resource becomes 0 + cs.completedContainer(cs.getRMContainer(containerId2), + ContainerStatus.newInstance(containerId2, ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL); + cs.completedContainer(cs.getRMContainer(containerId1), + ContainerStatus.newInstance(containerId1, ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL); + + checkUsedResource(rm, "a", 0, "x"); + checkUsedResource(rm, "a", 0, "y"); + checkUsedResource(rm, "a", 0, "z"); + checkUsedResource(rm, "a", 0); + checkUsedResource(rm, "root", 0, "x"); + checkUsedResource(rm, "root", 0, "y"); + checkUsedResource(rm, "root", 0, "z"); + checkUsedResource(rm, "root", 0); + checkUserUsedResource(rm, "a", "user", "x", 0); + checkUserUsedResource(rm, "a", "user", "y", 0); + checkUserUsedResource(rm, "a", "user", "z", 0); + checkUserUsedResource(rm, "a", "user", "", 0); + + rm.close(); + } + + + @Test (timeout = 60000) + public void testComplexResourceUsageWhenNodeUpdatesPartition() + throws Exception { + /* + * This test is similar to testResourceUsageWhenNodeUpdatesPartition, this + * will include multiple applications, multiple users and multiple + * containers running on a single node, size of each container is 1G + * + * Node 1 + * ------ + * App1-container3 + * App2-container2 + * App2-Container3 + * + * Node 2 + * ------ + * App2-container1 + * App1-container1 + * App1-container2 + */ + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z")); + + // set mapping: + // h1 -> x + // h2 -> y + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 80000); + MockNM nm2 = rm.registerNode("h2:1234", 80000); + + // app1 + RMApp app1 = rm.submitApp(GB, "app", "u1", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + + // c2 on n1, c3 on n2 + am1.allocate("*", GB, 1, new ArrayList(), "x"); + ContainerId containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertTrue(rm.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + am1.allocate("*", GB, 1, new ArrayList()); containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); Assert.assertTrue(rm.waitForState(nm2, containerId, RMContainerState.ALLOCATED, 10 * 1000)); - // check used resource: - // queue-a used y=1G, ""=1G - checkUsedResource(rm, "a", 1024, "y"); - checkUsedResource(rm, "a", 1024); - - // change h2's label to no label, container should be killed - mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h2", 0), - CommonNodeLabelsManager.EMPTY_STRING_SET)); - Assert.assertTrue(rm.waitForState(nm1, containerId, - RMContainerState.KILLED, 10 * 1000)); - - // check used resource: - // queue-a used x=0G, y=0G, ""=1G ("" not changed) - checkUsedResource(rm, "a", 0, "x"); - checkUsedResource(rm, "a", 0, "y"); - checkUsedResource(rm, "a", 1024); - + // app2 + RMApp app2 = rm.submitApp(GB, "app", "u2", null, "a"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + + // c2/c3 on n1 + am2.allocate("*", GB, 2, new ArrayList(), "x"); containerId = - ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); - - // change h3's label to z, AM container should be killed - mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h3", 0), - toSet("z"))); + ContainerId.newContainerId(am2.getApplicationAttemptId(), 3); Assert.assertTrue(rm.waitForState(nm1, containerId, - RMContainerState.KILLED, 10 * 1000)); + RMContainerState.ALLOCATED, 10 * 1000)); // check used resource: - // queue-a used x=0G, y=0G, ""=1G ("" not changed) + // queue-a used x=1G, ""=1G + checkUsedResource(rm, "a", 3 * GB, "x"); + checkUsedResource(rm, "a", 3 * GB); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + FiCaSchedulerApp application1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + FiCaSchedulerApp application2 = + cs.getApplicationAttempt(am2.getApplicationAttemptId()); + + // change h1's label to z + cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(), + toSet("z")))); checkUsedResource(rm, "a", 0, "x"); - checkUsedResource(rm, "a", 0, "y"); - checkUsedResource(rm, "a", 0); + checkUsedResource(rm, "a", 3 * GB, "z"); + checkUsedResource(rm, "a", 3 * GB); + checkUsedResource(rm, "root", 0, "x"); + checkUsedResource(rm, "root", 3 * GB, "z"); + checkUsedResource(rm, "root", 3 * GB); + checkUserUsedResource(rm, "a", "u1", "x", 0 * GB); + checkUserUsedResource(rm, "a", "u1", "z", 1 * GB); + checkUserUsedResource(rm, "a", "u1", "", 2 * GB); + checkUserUsedResource(rm, "a", "u2", "x", 0 * GB); + checkUserUsedResource(rm, "a", "u2", "z", 2 * GB); + checkUserUsedResource(rm, "a", "u2", "", 1 * GB); + Assert.assertEquals(0, + application1.getAppAttemptResourceUsage().getUsed("x").getMemory()); + Assert.assertEquals(1 * GB, + application1.getAppAttemptResourceUsage().getUsed("z").getMemory()); + Assert.assertEquals(2 * GB, + application1.getAppAttemptResourceUsage().getUsed("").getMemory()); + Assert.assertEquals(0, + application2.getAppAttemptResourceUsage().getUsed("x").getMemory()); + Assert.assertEquals(2 * GB, + application2.getAppAttemptResourceUsage().getUsed("z").getMemory()); + Assert.assertEquals(1 * GB, + application2.getAppAttemptResourceUsage().getUsed("").getMemory()); rm.close(); }