From 8ba8521de5bc8ddefd60196eda814d2e59432b2f Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Mon, 22 Sep 2014 23:49:39 -0700 Subject: [PATCH] YARN-1959. Fix headroom calculation in FairScheduler. (Anubhav Dhoot via kasha) (cherry picked from commit 568d3dc2bbe43b7d2833d5da2b0e6d75eb86e5dd) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/FSAppAttempt.java | 27 ++++++++ .../scheduler/fair/SchedulingPolicy.java | 15 +++++ .../DominantResourceFairnessPolicy.java | 17 ++++- .../fair/policies/FairSharePolicy.java | 11 ++++ .../scheduler/fair/policies/FifoPolicy.java | 12 ++++ .../scheduler/fair/TestFSAppAttempt.java | 63 +++++++++++++++++++ 7 files changed, 147 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 060285178ea..d3b69fcecc3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -214,6 +214,9 @@ Release 2.6.0 - UNRELEASED YARN-2539. FairScheduler: Set the default value for maxAMShare to 0.5. (Wei Yan via kasha) + YARN-1959. Fix headroom calculation in FairScheduler. + (Anubhav Dhoot via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 825c3985c77..b9966e7f551 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -171,6 +171,33 @@ public class FSAppAttempt extends SchedulerApplicationAttempt + priority + "; currentReservation " + currentReservation); } + @Override + public synchronized Resource getHeadroom() { + final FSQueue queue = (FSQueue) this.queue; + SchedulingPolicy policy = queue.getPolicy(); + + Resource queueFairShare = queue.getFairShare(); + Resource queueUsage = queue.getResourceUsage(); + Resource clusterResource = this.scheduler.getClusterResource(); + Resource clusterUsage = this.scheduler.getRootQueueMetrics() + .getAllocatedResources(); + Resource clusterAvailableResource = Resources.subtract(clusterResource, + clusterUsage); + Resource headroom = policy.getHeadroom(queueFairShare, + queueUsage, clusterAvailableResource); + if (LOG.isDebugEnabled()) { + LOG.debug("Headroom calculation for " + this.getName() + ":" + + "Min(" + + "(queueFairShare=" + queueFairShare + + " - queueUsage=" + queueUsage + ")," + + " clusterAvailableResource=" + clusterAvailableResource + + "(clusterResource=" + clusterResource + + " - clusterUsage=" + clusterUsage + ")" + + "Headroom=" + headroom); + } + return headroom; + } + public synchronized float getLocalityWaitFactor( Priority priority, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index ca006c580ed..4f3123dffdd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -175,4 +175,19 @@ public abstract class SchedulingPolicy { */ public abstract boolean checkIfAMResourceUsageOverLimit( Resource usage, Resource maxAMResource); + + /** + * Get headroom by calculating the min of clusterAvailable and + * (queueFairShare - queueUsage) resources that are + * applicable to this policy. For eg if only memory then leave other + * resources such as CPU to same as clusterAvailable. + * + * @param queueFairShare fairshare in the queue + * @param queueUsage resources used in the queue + * @param clusterAvailable available resource in cluster + * @return calculated headroom + */ + public abstract Resource getHeadroom(Resource queueFairShare, + Resource queueUsage, Resource clusterAvailable); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 42044bcaac1..3f6cbd19adb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -77,7 +77,7 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy { ComputeFairShares.computeSteadyShares(queues, totalResources, type); } } - + @Override public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { return !Resources.fitsIn(usage, fairShare); @@ -88,6 +88,21 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy { return !Resources.fitsIn(usage, maxAMResource); } + @Override + public Resource getHeadroom(Resource queueFairShare, Resource queueUsage, + Resource clusterAvailable) { + int queueAvailableMemory = + Math.max(queueFairShare.getMemory() - queueUsage.getMemory(), 0); + int queueAvailableCPU = + Math.max(queueFairShare.getVirtualCores() - queueUsage + .getVirtualCores(), 0); + Resource headroom = Resources.createResource( + Math.min(clusterAvailable.getMemory(), queueAvailableMemory), + Math.min(clusterAvailable.getVirtualCores(), + queueAvailableCPU)); + return headroom; + } + @Override public void initialize(Resource clusterCapacity) { comparator.setClusterCapacity(clusterCapacity); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 66bb88bf16c..97669cb4e27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -114,6 +114,17 @@ public class FairSharePolicy extends SchedulingPolicy { return comparator; } + @Override + public Resource getHeadroom(Resource queueFairShare, + Resource queueUsage, Resource clusterAvailable) { + int queueAvailableMemory = Math.max( + queueFairShare.getMemory() - queueUsage.getMemory(), 0); + Resource headroom = Resources.createResource( + Math.min(clusterAvailable.getMemory(), queueAvailableMemory), + clusterAvailable.getVirtualCores()); + return headroom; + } + @Override public void computeShares(Collection schedulables, Resource totalResources) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 591ee4936b9..a2e17ecb0a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -107,6 +107,18 @@ public class FifoPolicy extends SchedulingPolicy { return usage.getMemory() > maxAMResource.getMemory(); } + @Override + public Resource getHeadroom(Resource queueFairShare, + Resource queueUsage, Resource clusterAvailable) { + int queueAvailableMemory = Math.max( + queueFairShare.getMemory() - queueUsage.getMemory(), 0); + Resource headroom = Resources.createResource( + Math.min(clusterAvailable.getMemory(), queueAvailableMemory), + clusterAvailable.getVirtualCores()); + return headroom; + } + + @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_LEAF; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java index 0ab1f70147b..f560690d935 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import static org.junit.Assert.assertEquals; @@ -26,7 +27,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -185,4 +191,61 @@ public class TestFSAppAttempt extends FairSchedulerTestBase { assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel( prio, 10, -1.0, -1.0)); } + + @Test + public void testHeadroom() { + final FairScheduler mockScheduler = Mockito.mock(FairScheduler.class); + Mockito.when(mockScheduler.getClock()).thenReturn(scheduler.getClock()); + + final FSLeafQueue mockQueue = Mockito.mock(FSLeafQueue.class); + final Resource queueFairShare = Resources.createResource(4096, 4); + final Resource queueUsage = Resource.newInstance(1024, 1); + final Resource clusterResource = Resources.createResource(8192, 8); + final Resource clusterUsage = Resources.createResource(6144, 2); + final QueueMetrics fakeRootQueueMetrics = Mockito.mock(QueueMetrics.class); + + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + RMContext rmContext = resourceManager.getRMContext(); + FSAppAttempt schedulerApp = + new FSAppAttempt(mockScheduler, applicationAttemptId, "user1", mockQueue , + null, rmContext); + + Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare); + Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage); + Mockito.when(mockScheduler.getClusterResource()).thenReturn + (clusterResource); + Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn + (clusterUsage); + Mockito.when(mockScheduler.getRootQueueMetrics()).thenReturn + (fakeRootQueueMetrics); + + int minClusterAvailableMemory = 2048; + int minClusterAvailableCPU = 6; + int minQueueAvailableCPU = 3; + + // Min of Memory and CPU across cluster and queue is used in + // DominantResourceFairnessPolicy + Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy + .getInstance(DominantResourceFairnessPolicy.class)); + verifyHeadroom(schedulerApp, minClusterAvailableMemory, + minQueueAvailableCPU); + + // Fair and Fifo ignore CPU of queue, so use cluster available CPU + Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy + .getInstance(FairSharePolicy.class)); + verifyHeadroom(schedulerApp, minClusterAvailableMemory, + minClusterAvailableCPU); + + Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy + .getInstance(FifoPolicy.class)); + verifyHeadroom(schedulerApp, minClusterAvailableMemory, + minClusterAvailableCPU); + } + + protected void verifyHeadroom(FSAppAttempt schedulerApp, + int expectedMemory, int expectedCPU) { + Resource headroom = schedulerApp.getHeadroom(); + assertEquals(expectedMemory, headroom.getMemory()); + assertEquals(expectedCPU, headroom.getVirtualCores()); + } }