diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ec9630296ef..b8a938d46bc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -244,6 +244,9 @@ Release 2.6.0 - UNRELEASED
YARN-2539. FairScheduler: Set the default value for maxAMShare to 0.5.
(Wei Yan via kasha)
+ YARN-1959. Fix headroom calculation in FairScheduler.
+ (Anubhav Dhoot via kasha)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 825c3985c77..b9966e7f551 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -171,6 +171,33 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
+ priority + "; currentReservation " + currentReservation);
}
+ @Override
+ public synchronized Resource getHeadroom() {
+ final FSQueue queue = (FSQueue) this.queue;
+ SchedulingPolicy policy = queue.getPolicy();
+
+ Resource queueFairShare = queue.getFairShare();
+ Resource queueUsage = queue.getResourceUsage();
+ Resource clusterResource = this.scheduler.getClusterResource();
+ Resource clusterUsage = this.scheduler.getRootQueueMetrics()
+ .getAllocatedResources();
+ Resource clusterAvailableResource = Resources.subtract(clusterResource,
+ clusterUsage);
+ Resource headroom = policy.getHeadroom(queueFairShare,
+ queueUsage, clusterAvailableResource);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Headroom calculation for " + this.getName() + ":" +
+ "Min(" +
+ "(queueFairShare=" + queueFairShare +
+ " - queueUsage=" + queueUsage + ")," +
+ " clusterAvailableResource=" + clusterAvailableResource +
+ "(clusterResource=" + clusterResource +
+ " - clusterUsage=" + clusterUsage + ")" +
+ "Headroom=" + headroom);
+ }
+ return headroom;
+ }
+
public synchronized float getLocalityWaitFactor(
Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
index ca006c580ed..4f3123dffdd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
@@ -175,4 +175,19 @@ public abstract class SchedulingPolicy {
*/
public abstract boolean checkIfAMResourceUsageOverLimit(
Resource usage, Resource maxAMResource);
+
+ /**
+ * Get headroom by calculating the min of clusterAvailable
and
+ * (queueFairShare
- queueUsage
) resources that are
+ * applicable to this policy. For eg if only memory then leave other
+ * resources such as CPU to same as clusterAvailable.
+ *
+ * @param queueFairShare fairshare in the queue
+ * @param queueUsage resources used in the queue
+ * @param clusterAvailable available resource in cluster
+ * @return calculated headroom
+ */
+ public abstract Resource getHeadroom(Resource queueFairShare,
+ Resource queueUsage, Resource clusterAvailable);
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
index 42044bcaac1..3f6cbd19adb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
@@ -77,7 +77,7 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
ComputeFairShares.computeSteadyShares(queues, totalResources, type);
}
}
-
+
@Override
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
return !Resources.fitsIn(usage, fairShare);
@@ -88,6 +88,21 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
return !Resources.fitsIn(usage, maxAMResource);
}
+ @Override
+ public Resource getHeadroom(Resource queueFairShare, Resource queueUsage,
+ Resource clusterAvailable) {
+ int queueAvailableMemory =
+ Math.max(queueFairShare.getMemory() - queueUsage.getMemory(), 0);
+ int queueAvailableCPU =
+ Math.max(queueFairShare.getVirtualCores() - queueUsage
+ .getVirtualCores(), 0);
+ Resource headroom = Resources.createResource(
+ Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
+ Math.min(clusterAvailable.getVirtualCores(),
+ queueAvailableCPU));
+ return headroom;
+ }
+
@Override
public void initialize(Resource clusterCapacity) {
comparator.setClusterCapacity(clusterCapacity);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
index 66bb88bf16c..97669cb4e27 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
@@ -114,6 +114,17 @@ public class FairSharePolicy extends SchedulingPolicy {
return comparator;
}
+ @Override
+ public Resource getHeadroom(Resource queueFairShare,
+ Resource queueUsage, Resource clusterAvailable) {
+ int queueAvailableMemory = Math.max(
+ queueFairShare.getMemory() - queueUsage.getMemory(), 0);
+ Resource headroom = Resources.createResource(
+ Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
+ clusterAvailable.getVirtualCores());
+ return headroom;
+ }
+
@Override
public void computeShares(Collection extends Schedulable> schedulables,
Resource totalResources) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
index 591ee4936b9..a2e17ecb0a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
@@ -107,6 +107,18 @@ public class FifoPolicy extends SchedulingPolicy {
return usage.getMemory() > maxAMResource.getMemory();
}
+ @Override
+ public Resource getHeadroom(Resource queueFairShare,
+ Resource queueUsage, Resource clusterAvailable) {
+ int queueAvailableMemory = Math.max(
+ queueFairShare.getMemory() - queueUsage.getMemory(), 0);
+ Resource headroom = Resources.createResource(
+ Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
+ clusterAvailable.getVirtualCores());
+ return headroom;
+ }
+
+
@Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_LEAF;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
index 0ab1f70147b..f560690d935 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import static org.junit.Assert.assertEquals;
@@ -26,7 +27,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -185,4 +191,61 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
prio, 10, -1.0, -1.0));
}
+
+ @Test
+ public void testHeadroom() {
+ final FairScheduler mockScheduler = Mockito.mock(FairScheduler.class);
+ Mockito.when(mockScheduler.getClock()).thenReturn(scheduler.getClock());
+
+ final FSLeafQueue mockQueue = Mockito.mock(FSLeafQueue.class);
+ final Resource queueFairShare = Resources.createResource(4096, 4);
+ final Resource queueUsage = Resource.newInstance(1024, 1);
+ final Resource clusterResource = Resources.createResource(8192, 8);
+ final Resource clusterUsage = Resources.createResource(6144, 2);
+ final QueueMetrics fakeRootQueueMetrics = Mockito.mock(QueueMetrics.class);
+
+ ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
+ RMContext rmContext = resourceManager.getRMContext();
+ FSAppAttempt schedulerApp =
+ new FSAppAttempt(mockScheduler, applicationAttemptId, "user1", mockQueue ,
+ null, rmContext);
+
+ Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare);
+ Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage);
+ Mockito.when(mockScheduler.getClusterResource()).thenReturn
+ (clusterResource);
+ Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn
+ (clusterUsage);
+ Mockito.when(mockScheduler.getRootQueueMetrics()).thenReturn
+ (fakeRootQueueMetrics);
+
+ int minClusterAvailableMemory = 2048;
+ int minClusterAvailableCPU = 6;
+ int minQueueAvailableCPU = 3;
+
+ // Min of Memory and CPU across cluster and queue is used in
+ // DominantResourceFairnessPolicy
+ Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
+ .getInstance(DominantResourceFairnessPolicy.class));
+ verifyHeadroom(schedulerApp, minClusterAvailableMemory,
+ minQueueAvailableCPU);
+
+ // Fair and Fifo ignore CPU of queue, so use cluster available CPU
+ Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
+ .getInstance(FairSharePolicy.class));
+ verifyHeadroom(schedulerApp, minClusterAvailableMemory,
+ minClusterAvailableCPU);
+
+ Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
+ .getInstance(FifoPolicy.class));
+ verifyHeadroom(schedulerApp, minClusterAvailableMemory,
+ minClusterAvailableCPU);
+ }
+
+ protected void verifyHeadroom(FSAppAttempt schedulerApp,
+ int expectedMemory, int expectedCPU) {
+ Resource headroom = schedulerApp.getHeadroom();
+ assertEquals(expectedMemory, headroom.getMemory());
+ assertEquals(expectedCPU, headroom.getVirtualCores());
+ }
}