diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 343e9c3bcc8..27546167a03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -81,6 +81,7 @@ public class FSLeafQueue extends FSQueue { this.lastTimeAtMinShare = scheduler.getClock().getTime(); activeUsersManager = new ActiveUsersManager(getMetrics()); amResourceUsage = Resource.newInstance(0, 0); + getMetrics().setAMResourceUsage(amResourceUsage); } void addApp(FSAppAttempt app, boolean runnable) { @@ -132,6 +133,7 @@ public class FSLeafQueue extends FSQueue { // running an unmanaged AM. if (runnable && app.isAmRunning()) { Resources.subtractFrom(amResourceUsage, app.getAMResource()); + getMetrics().setAMResourceUsage(amResourceUsage); } return runnable; @@ -468,19 +470,14 @@ public class FSLeafQueue extends FSQueue { } /** - * Check whether this queue can run this application master under the - * maxAMShare limit. - * - * @param amResource resources required to run the AM - * @return true if this queue can run - */ - boolean canRunAppAM(Resource amResource) { - if (Math.abs(maxAMShare - -1.0f) < 0.0001) { - return true; - } - - // If FairShare is zero, use min(maxShare, available resource) to compute - // maxAMResource + * Compute the maximum resource AM can use. The value is the result of + * multiplying FairShare and maxAMShare. If FairShare is zero, use + * min(maxShare, available resource) instead to prevent zero value for + * maximum AM resource since it forbids any job running in the queue. + * + * @return the maximum resource AM can use + */ + private Resource computeMaxAMResource() { Resource maxResource = Resources.clone(getFairShare()); if (maxResource.getMemorySize() == 0) { maxResource.setMemorySize( @@ -494,7 +491,23 @@ public class FSLeafQueue extends FSQueue { getMaxShare().getVirtualCores())); } - Resource maxAMResource = Resources.multiply(maxResource, maxAMShare); + return Resources.multiply(maxResource, maxAMShare); + } + + /** + * Check whether this queue can run the Application Master under the + * maxAMShare limit. + * + * @param amResource resources required to run the AM + * @return true if this queue can run + */ + public boolean canRunAppAM(Resource amResource) { + if (Math.abs(maxAMShare - -1.0f) < 0.0001) { + return true; + } + + Resource maxAMResource = computeMaxAMResource(); + getMetrics().setMaxAMShare(maxAMResource); Resource ifRunAMResource = Resources.add(amResourceUsage, amResource); return Resources.fitsIn(ifRunAMResource, maxAMResource); } @@ -502,6 +515,7 @@ public class FSLeafQueue extends FSQueue { void addAMResourceUsage(Resource amResource) { if (amResource != null) { Resources.addTo(amResourceUsage, amResource); + getMetrics().setAMResourceUsage(amResourceUsage); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 572b5f90daf..38c03408511 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + @Private @Unstable public abstract class FSQueue implements Queue, Schedulable { @@ -160,6 +162,11 @@ public abstract class FSQueue implements Queue, Schedulable { return maxRunningApps; } + @VisibleForTesting + protected float getMaxAMShare() { + return maxAMShare; + } + public void setMaxAMShare(float maxAMShare){ this.maxAMShare = maxAMShare; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java index 42c8825735d..a970815f62a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java @@ -41,6 +41,10 @@ public class FSQueueMetrics extends QueueMetrics { @Metric("Maximum share of memory in MB") MutableGaugeLong maxShareMB; @Metric("Maximum share of CPU in vcores") MutableGaugeLong maxShareVCores; @Metric("Maximum number of applications") MutableGaugeInt maxApps; + @Metric("Maximum AM share of memory in MB") MutableGaugeLong maxAMShareMB; + @Metric("Maximum AM share of CPU in vcores") MutableGaugeInt maxAMShareVCores; + @Metric("AM resource usage of memory in MB") MutableGaugeLong amResourceUsageMB; + @Metric("AM resource usage of CPU in vcores") MutableGaugeInt amResourceUsageVCores; private String schedulingPolicy; @@ -109,6 +113,62 @@ public class FSQueueMetrics extends QueueMetrics { maxApps.set(max); } + /** + * Get the maximum memory size AM can use in MB. + * + * @return the maximum memory size AM can use + */ + public long getMaxAMShareMB() { + return maxAMShareMB.value(); + } + + /** + * Get the maximum number of VCores AM can use. + * + * @return the maximum number of VCores AM can use + */ + public int getMaxAMShareVCores() { + return maxAMShareVCores.value(); + } + + /** + * Set the maximum resource AM can use. + * + * @param resource the maximum resource AM can use + */ + public void setMaxAMShare(Resource resource) { + maxAMShareMB.set(resource.getMemorySize()); + maxAMShareVCores.set(resource.getVirtualCores()); + } + + /** + * Get the AM memory usage in MB. + * + * @return the AM memory usage + */ + public long getAMResourceUsageMB() { + return amResourceUsageMB.value(); + } + + /** + * Get the AM VCore usage. + * + * @return the AM VCore usage + */ + public int getAMResourceUsageVCores() { + return amResourceUsageVCores.value(); + } + + /** + * Set the AM resource usage. + * + * @param resource the AM resource usage + */ + public void setAMResourceUsage(Resource resource) { + amResourceUsageMB.set(resource.getMemorySize()); + amResourceUsageVCores.set(resource.getVirtualCores()); + } + public String getSchedulingPolicy() { return schedulingPolicy; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index ffbfec86ccc..c5ff5e7c861 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -594,6 +594,143 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals(0, queue.getFairShare().getMemorySize()); } + /** + * Test if we compute the maximum AM resource correctly. + * + * @throws IOException if scheduler reinitialization fails + */ + @Test + public void testComputeMaxAMResource() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0"); + out.println("0.5"); + out.println("4096 mb 4 vcores"); + out.println(""); + out.println(""); + out.println("0.0"); + out.println("0.5"); + out.println(""); + out.println(""); + out.println("1"); + out.println("0.5"); + out.println(""); + out.println("drf" + + ""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + long memCapacity = 20 * GB; + int cpuCapacity = 20; + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(memCapacity, + cpuCapacity), 0, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeEvent); + scheduler.update(); + + Resource amResource = Resource.newInstance(1 * GB, 1); + int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); + + // queueFSZeroWithMax + FSLeafQueue queueFSZeroWithMax = scheduler.getQueueManager(). + getLeafQueue("queueFSZeroWithMax", true); + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(attId1, "queueFSZeroWithMax", "user1", + amResource); + createSchedulingRequestExistingApplication(1 * GB, 1, amPriority, attId1); + scheduler.update(); + scheduler.handle(updateEvent); + + // queueFSZeroWithMax's weight is 0.0, so its fair share should be 0, we use + // the min(maxShare, available resource) to compute maxAMShare, in this + // case, we use maxShare, since it is smaller than available resource. + assertEquals("QueueFSZeroWithMax's fair share should be zero", + 0, queueFSZeroWithMax.getFairShare().getMemorySize()); + assertEquals("QueueFSZeroWithMax's maximum AM resource should be " + + "maxShare * maxAMShare", + (long)(queueFSZeroWithMax.getMaxShare().getMemorySize() * + queueFSZeroWithMax.getMaxAMShare()), + queueFSZeroWithMax.getMetrics().getMaxAMShareMB()); + assertEquals("QueueFSZeroWithMax's maximum AM resource should be " + + "maxShare * maxAMShare", + (long)(queueFSZeroWithMax.getMaxShare().getVirtualCores() * + queueFSZeroWithMax.getMaxAMShare()), + queueFSZeroWithMax.getMetrics().getMaxAMShareVCores()); + assertEquals("QueueFSZeroWithMax's AM resource usage should be the same to " + + "AM resource request", + amResource.getMemorySize(), + queueFSZeroWithMax.getMetrics().getAMResourceUsageMB()); + + // queueFSZeroWithAVL + amResource = Resources.createResource(1 * GB, 1); + FSLeafQueue queueFSZeroWithAVL = scheduler.getQueueManager(). + getLeafQueue("queueFSZeroWithAVL", true); + ApplicationAttemptId attId2 = createAppAttemptId(2, 1); + createApplicationWithAMResource(attId2, "queueFSZeroWithAVL", "user1", + amResource); + createSchedulingRequestExistingApplication(1 * GB, 1, amPriority, attId2); + scheduler.update(); + scheduler.handle(updateEvent); + + // queueFSZeroWithAVL's weight is 0.0, so its fair share is 0, and we use + // the min(maxShare, available resource) to compute maxAMShare, in this + // case, we use available resource since it is smaller than the + // default maxShare. + assertEquals("QueueFSZeroWithAVL's fair share should be zero", + 0, queueFSZeroWithAVL.getFairShare().getMemorySize()); + assertEquals("QueueFSZeroWithAVL's maximum AM resource should be " + + " available resource * maxAMShare", + (long) ((memCapacity - amResource.getMemorySize()) * + queueFSZeroWithAVL.getMaxAMShare()), + queueFSZeroWithAVL.getMetrics().getMaxAMShareMB()); + assertEquals("QueueFSZeroWithAVL's maximum AM resource should be " + + " available resource * maxAMShare", + (long) ((cpuCapacity - amResource.getVirtualCores()) * + queueFSZeroWithAVL.getMaxAMShare()), + queueFSZeroWithAVL.getMetrics().getMaxAMShareVCores()); + assertEquals("QueueFSZeroWithMax's AM resource usage should be the same to " + + "AM resource request", + amResource.getMemorySize(), + queueFSZeroWithAVL.getMetrics().getAMResourceUsageMB()); + + // queueFSNonZero + amResource = Resources.createResource(1 * GB, 1); + FSLeafQueue queueFSNonZero = scheduler.getQueueManager(). + getLeafQueue("queueFSNonZero", true); + ApplicationAttemptId attId3 = createAppAttemptId(3, 1); + createApplicationWithAMResource(attId3, "queueFSNonZero", "user1", + amResource); + createSchedulingRequestExistingApplication(1 * GB, 1, amPriority, attId3); + scheduler.update(); + scheduler.handle(updateEvent); + + // queueFSNonZero's weight is 1, so its fair share is not 0, and we use the + // fair share to compute maxAMShare + assertNotEquals("QueueFSNonZero's fair share shouldn't be zero", + 0, queueFSNonZero.getFairShare().getMemorySize()); + assertEquals("QueueFSNonZero's maximum AM resource should be " + + " fair share * maxAMShare", + (long)(memCapacity * queueFSNonZero.getMaxAMShare()), + queueFSNonZero.getMetrics().getMaxAMShareMB()); + assertEquals("QueueFSNonZero's maximum AM resource should be " + + " fair share * maxAMShare", + (long)(cpuCapacity * queueFSNonZero.getMaxAMShare()), + queueFSNonZero.getMetrics().getMaxAMShareVCores()); + assertEquals("QueueFSNonZero's AM resource usage should be the same to " + + "AM resource request", + amResource.getMemorySize(), + queueFSNonZero.getMetrics().getAMResourceUsageMB()); + } + @Test public void testFairShareWithZeroWeightNoneZeroMinRes() throws IOException { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);