From f9ef3e3719fa4a88c6fe343f5953095ece205c12 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Tue, 29 Mar 2016 17:07:55 -0700 Subject: [PATCH] YARN-4865. Track Reserved resources in ResourceUsage and QueueCapacities. (Sunil G via wangda) --- .../resourcemanager/scheduler/Queue.java | 20 +++++ .../scheduler/capacity/AbstractCSQueue.java | 26 ++++++- .../scheduler/capacity/CSQueueUtils.java | 13 ++++ .../scheduler/capacity/LeafQueue.java | 16 +++- .../scheduler/capacity/QueueCapacities.java | 38 ++++++++- .../scheduler/fair/FSQueue.java | 8 ++ .../scheduler/fifo/FifoScheduler.java | 12 +++ .../capacity/TestContainerAllocation.java | 11 ++- .../TestNodeLabelContainerAllocation.java | 77 +++++++++++++++++++ .../capacity/TestQueueCapacities.java | 4 +- 10 files changed, 220 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index 8646381febc..ada2a0b1eea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -118,4 +118,24 @@ public interface Queue { * @return default application priority */ public Priority getDefaultApplicationPriority(); + + /** + * Increment Reserved Capacity + * + * @param partition + * asked by application + * @param reservedRes + * reserved resource asked + */ + public void incReservedResource(String partition, Resource reservedRes); + + /** + * Decrement Reserved Capacity + * + * @param partition + * asked by application + * @param reservedRes + * reserved resource asked + */ + public void decReservedResource(String partition, Resource reservedRes); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 66533b2e06f..5757d02f187 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -536,7 +536,31 @@ public abstract class AbstractCSQueue implements CSQueue { } return true; } - + + @Override + public void incReservedResource(String partition, Resource reservedRes) { + if (partition == null) { + partition = RMNodeLabelsManager.NO_LABEL; + } + + queueUsage.incReserved(partition, reservedRes); + if(null != parent){ + parent.incReservedResource(partition, reservedRes); + } + } + + @Override + public void decReservedResource(String partition, Resource reservedRes) { + if (partition == null) { + partition = RMNodeLabelsManager.NO_LABEL; + } + + queueUsage.decReserved(partition, reservedRes); + if(null != parent){ + parent.decReservedResource(partition, reservedRes); + } + } + @Override public void incPendingResource(String nodeLabel, Resource resourceToInc) { if (nodeLabel == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 5e9a5af9742..d5cdb3221f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -186,6 +186,8 @@ class CSQueueUtils { String nodePartition) { float absoluteUsedCapacity = 0.0f; float usedCapacity = 0.0f; + float reservedCapacity = 0.0f; + float absoluteReservedCapacity = 0.0f; if (Resources.greaterThan(rc, totalPartitionResource, totalPartitionResource, Resources.none())) { @@ -207,11 +209,22 @@ class CSQueueUtils { usedCapacity = Resources.divide(rc, totalPartitionResource, usedResource, queueGuranteedResource); + + Resource resResource = queueResourceUsage.getReserved(nodePartition); + reservedCapacity = + Resources.divide(rc, totalPartitionResource, resResource, + queueGuranteedResource); + absoluteReservedCapacity = + Resources.divide(rc, totalPartitionResource, resResource, + totalPartitionResource); } queueCapacities .setAbsoluteUsedCapacity(nodePartition, absoluteUsedCapacity); queueCapacities.setUsedCapacity(nodePartition, usedCapacity); + queueCapacities.setReservedCapacity(nodePartition, reservedCapacity); + queueCapacities + .setAbsoluteReservedCapacity(nodePartition, absoluteReservedCapacity); } private static Resource getMaxAvailableResourceToQueue( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 4b138eba4fc..2a9c00621c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -994,6 +994,13 @@ public class LeafQueue extends AbstractCSQueue { node.getPartition(), reservedOrAllocatedRMContainer, assignment.isIncreasedAllocation()); + // Update reserved metrics + Resource reservedRes = assignment.getAssignmentInformation() + .getReserved(); + if (reservedRes != null && !reservedRes.equals(Resources.none())) { + incReservedResource(node.getPartition(), reservedRes); + } + // Done return assignment; } else if (assignment.getSkippedType() @@ -1433,7 +1440,14 @@ public class LeafQueue extends AbstractCSQueue { // Book-keeping if (removed) { - + + // track reserved resource for metrics, for normal container + // getReservedResource will be null. + Resource reservedRes = rmContainer.getReservedResource(); + if (reservedRes != null && !reservedRes.equals(Resources.none())) { + decReservedResource(node.getPartition(), reservedRes); + } + // Inform the ordering policy orderingPolicy.containerReleased(application, rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java index f2c26327ae0..cc4af3dfb47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -50,7 +50,7 @@ public class QueueCapacities { // Usage enum here to make implement cleaner private enum CapacityType { USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5), - MAX_AM_PERC(6); + MAX_AM_PERC(6), RESERVED_CAP(7), ABS_RESERVED_CAP(8); private int idx; @@ -76,6 +76,8 @@ public class QueueCapacities { sb.append("cap=" + capacitiesArr[4] + "%, "); sb.append("abs_cap=" + capacitiesArr[5] + "%}"); sb.append("max_am_perc=" + capacitiesArr[6] + "%}"); + sb.append("reserved_cap=" + capacitiesArr[7] + "%}"); + sb.append("abs_reserved_cap=" + capacitiesArr[8] + "%}"); return sb.toString(); } } @@ -234,6 +236,40 @@ public class QueueCapacities { _set(NL, CapacityType.MAX_AM_PERC, value); } + /* Reserved Capacity Getter and Setter */ + public float getReservedCapacity() { + return _get(NL, CapacityType.RESERVED_CAP); + } + + public float getReservedCapacity(String label) { + return _get(label, CapacityType.RESERVED_CAP); + } + + public void setReservedCapacity(float value) { + _set(NL, CapacityType.RESERVED_CAP, value); + } + + public void setReservedCapacity(String label, float value) { + _set(label, CapacityType.RESERVED_CAP, value); + } + + /* Absolute Reserved Capacity Getter and Setter */ + public float getAbsoluteReservedCapacity() { + return _get(NL, CapacityType.ABS_RESERVED_CAP); + } + + public float getAbsoluteReservedCapacity(String label) { + return _get(label, CapacityType.ABS_RESERVED_CAP); + } + + public void setAbsoluteReservedCapacity(float value) { + _set(NL, CapacityType.ABS_RESERVED_CAP, value); + } + + public void setAbsoluteReservedCapacity(String label, float value) { + _set(label, CapacityType.ABS_RESERVED_CAP, value); + } + /** * Clear configurable fields, like * (absolute)capacity/(absolute)maximum-capacity, this will be used by queue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index c5edbfbab43..e4a21978cb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -331,6 +331,14 @@ public abstract class FSQueue implements Queue, Schedulable { public void decPendingResource(String nodeLabel, Resource resourceToDec) { } + @Override + public void incReservedResource(String nodeLabel, Resource resourceToInc) { + } + + @Override + public void decReservedResource(String nodeLabel, Resource resourceToDec) { + } + @Override public Priority getDefaultApplicationPriority() { // TODO add implementation for FSParentQueue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 12b060ee889..409a043cca3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -219,6 +219,18 @@ public class FifoScheduler extends // TODO add implementation for FIFO scheduler return null; } + + @Override + public void incReservedResource(String partition, Resource reservedRes) { + // TODO add implementation for FIFO scheduler + + } + + @Override + public void decReservedResource(String partition, Resource reservedRes) { + // TODO add implementation for FIFO scheduler + + } }; public FifoScheduler() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index aefdaeae538..3a6ca82184e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -368,7 +368,8 @@ public class TestContainerAllocation { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - + LeafQueue leafQueue = (LeafQueue) cs.getQueue("default"); + // Do node heartbeats 2 times // First time will allocate container for app1, second time will reserve // container for app2 @@ -394,6 +395,10 @@ public class TestContainerAllocation { // Usage of queue = 4G + 2 * 1G + 4G (reserved) Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage() .getUsed().getMemorySize()); + Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved().getMemorySize()); + Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved() + .getMemorySize()); // Cancel asks of app2 and re-kick RM am2.allocate("*", 4 * GB, 0, new ArrayList()); @@ -406,6 +411,10 @@ public class TestContainerAllocation { Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertEquals(6 * GB, cs.getRootQueue().getQueueResourceUsage() .getUsed().getMemorySize()); + Assert.assertEquals(0, cs.getRootQueue().getQueueResourceUsage() + .getReserved().getMemorySize()); + Assert.assertEquals(0, leafQueue.getQueueResourceUsage().getReserved() + .getMemorySize()); rm1.close(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 9cc3440200d..6b9f676c018 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -466,6 +466,83 @@ public class TestNodeLabelContainerAllocation { rm1.close(); } + @Test (timeout = 120000) + public void testContainerReservationWithLabels() throws Exception { + // This test is pretty much similar to testContainerAllocateWithLabel. + // Difference is, this test doesn't specify label expression in + // ResourceRequest, + // instead, it uses default queue label expression + + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", + "z")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), + toSet("x"), NodeId.newInstance("h2", 0), toSet("y"), + NodeId.newInstance("h3", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM( + TestUtils.getConfigurationWithDefaultQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x + rm1.registerNode("h2:1234", 8 * GB); // label = y + rm1.registerNode("h3:1234", 8 * GB); // label = x + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // request a container. + am1.allocate("*", 4 * GB, 2, new ArrayList()); + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1"); + + // Do node heartbeats 2 times + // First time will allocate container for app1, second time will reserve + // container for app1 + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h1"); + + // Check if a 4G container allocated for app1, and 4G is reserved + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1 + .getApplicationAttemptId()); + Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); + Assert.assertTrue(schedulerApp1.getReservedContainers().size() > 0); + Assert.assertEquals(9 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemorySize()); + Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemorySize()); + Assert.assertEquals(4 * GB, + leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize()); + + // Cancel asks of app2 and re-kick RM + am1.allocate("*", 4 * GB, 0, new ArrayList()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + Assert.assertEquals(5 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemorySize()); + Assert.assertEquals(0, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemorySize()); + Assert.assertEquals(0, leafQueue.getQueueResourceUsage().getReserved("x") + .getMemorySize()); + rm1.close(); + } + private void checkPendingResource(MockRM rm, int priority, ApplicationAttemptId attemptId, int memory) { CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java index afdc6fcb2ec..ee370b483df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java @@ -44,7 +44,9 @@ public class TestQueueCapacities { { "AbsoluteUsedCapacity" }, { "MaximumCapacity" }, { "AbsoluteMaximumCapacity" }, - { "MaxAMResourcePercentage" } }); + { "MaxAMResourcePercentage" }, + { "ReservedCapacity" }, + { "AbsoluteReservedCapacity" }}); } public TestQueueCapacities(String suffix) {