diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index aabdf9c286b..fbcb91c453b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1348,13 +1348,6 @@ public class LeafQueue extends AbstractCSQueue { // Book-keeping if (removed) { - // track reserved resource for metrics, for normal container - // getReservedResource will be null. - Resource reservedRes = rmContainer.getReservedResource(); - if (reservedRes != null && !reservedRes.equals(Resources.none())) { - decReservedResource(node.getPartition(), reservedRes); - } - // Inform the ordering policy orderingPolicy.containerReleased(application, rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index f474aad2d0d..35329d27f38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -246,6 +246,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // Update reserved metrics queue.getMetrics().unreserveResource(getUser(), rmContainer.getReservedResource()); + queue.decReservedResource(node.getPartition(), + rmContainer.getReservedResource()); return true; } return false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 84eba109611..f94c963ec4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -28,6 +28,7 @@ import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -37,6 +38,8 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -50,8 +53,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -417,5 +425,183 @@ public class TestContainerAllocation { rm1.close(); } - + + @Test(timeout = 60000) + public void testAllocationForReservedContainer() throws Exception { + /** + * Test case: Submit two application (app1/app2) to a queue. And there's one + * node with 8G resource in the cluster. App1 allocates a 6G container, Then + * app2 asks for a 4G container. App2's request will be reserved on the + * node. + * + * Before next node heartbeat, app1 container is completed/killed. So app1 + * container which was reserved will be allocated. + */ + // inject node label manager + MockRM rm1 = new MockRM(); + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // launch another app to queue, AM container should be launched in nm1 + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + am1.allocate("*", 4 * GB, 1, new ArrayList()); + am2.allocate("*", 4 * GB, 1, new ArrayList()); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("default"); + + // Do node heartbeats 2 times + // First time will allocate container for app1, second time will reserve + // container for app2 + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // App2 will get preference to be allocated on node1, and node1 will be all + // used by App2. + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + FiCaSchedulerApp schedulerApp2 = + cs.getApplicationAttempt(am2.getApplicationAttemptId()); + + // Check if a 4G container allocated for app1, and nothing allocated for app2 + Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + Assert.assertTrue(schedulerApp2.getReservedContainers().size() > 0); + + // NM1 has available resource = 2G (8G - 2 * 1G - 4G) + Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + // Usage of queue = 4G + 2 * 1G + 4G (reserved) + Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed().getMemory()); + Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved().getMemory()); + Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved() + .getMemory()); + + // Mark one app1 container as killed/completed and re-kick RM + for (RMContainer container : schedulerApp1.getLiveContainers()) { + if (container.isAMContainer()) { + continue; + } + cs.markContainerForKillable(container); + } + // Cancel asks of app1 and re-kick RM + am1.allocate("*", 4 * GB, 0, new ArrayList()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // Check 4G container cancelled for app1, and one container allocated for + // app2 + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + Assert.assertFalse(schedulerApp2.getReservedContainers().size() > 0); + + // NM1 has available resource = 2G (8G - 2 * 1G - 4G) + Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + // Usage of queue = 4G + 2 * 1G + Assert.assertEquals(6 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed().getMemory()); + Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved().getMemory()); + Assert.assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved() + .getMemory()); + + rm1.close(); + } + + @Test(timeout = 60000) + public void testReservedContainerMetricsOnDecommisionedNode() throws Exception { + /** + * Test case: Submit two application (app1/app2) to a queue. And there's one + * node with 8G resource in the cluster. App1 allocates a 6G container, Then + * app2 asks for a 4G container. App2's request will be reserved on the + * node. + * + * Before next node heartbeat, app1 container is completed/killed. So app1 + * container which was reserved will be allocated. + */ + // inject node label manager + MockRM rm1 = new MockRM(); + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // launch another app to queue, AM container should be launched in nm1 + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + am1.allocate("*", 4 * GB, 1, new ArrayList()); + am2.allocate("*", 4 * GB, 1, new ArrayList()); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("default"); + + // Do node heartbeats 2 times + // First time will allocate container for app1, second time will reserve + // container for app2 + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // App2 will get preference to be allocated on node1, and node1 will be all + // used by App2. + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + FiCaSchedulerApp schedulerApp2 = + cs.getApplicationAttempt(am2.getApplicationAttemptId()); + + // Check if a 4G container allocated for app1, and nothing allocated for app2 + Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + Assert.assertTrue(schedulerApp2.getReservedContainers().size() > 0); + + // NM1 has available resource = 2G (8G - 2 * 1G - 4G) + Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + // Usage of queue = 4G + 2 * 1G + 4G (reserved) + Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed().getMemory()); + Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved().getMemory()); + Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved() + .getMemory()); + + // Remove the node + cs.handle(new NodeRemovedSchedulerEvent(rmNode1)); + + // Check all container cancelled for app1 and app2 + Assert.assertEquals(0, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(0, schedulerApp2.getLiveContainers().size()); + Assert.assertFalse(schedulerApp2.getReservedContainers().size() > 0); + + // Usage and Reserved capacity of queue is 0 + Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed().getMemory()); + Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved().getMemory()); + Assert.assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved() + .getMemory()); + + rm1.close(); + } }