YARN-1959. Fix headroom calculation in FairScheduler. (Anubhav Dhoot via kasha)

(cherry picked from commit 568d3dc2bb)
This commit is contained in:
Karthik Kambatla 2014-09-22 23:49:39 -07:00
parent 34ee084c3d
commit 8ba8521de5
7 changed files with 147 additions and 1 deletions

View File

@ -214,6 +214,9 @@ Release 2.6.0 - UNRELEASED
YARN-2539. FairScheduler: Set the default value for maxAMShare to 0.5. YARN-2539. FairScheduler: Set the default value for maxAMShare to 0.5.
(Wei Yan via kasha) (Wei Yan via kasha)
YARN-1959. Fix headroom calculation in FairScheduler.
(Anubhav Dhoot via kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -171,6 +171,33 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
+ priority + "; currentReservation " + currentReservation); + priority + "; currentReservation " + currentReservation);
} }
@Override
public synchronized Resource getHeadroom() {
final FSQueue queue = (FSQueue) this.queue;
SchedulingPolicy policy = queue.getPolicy();
Resource queueFairShare = queue.getFairShare();
Resource queueUsage = queue.getResourceUsage();
Resource clusterResource = this.scheduler.getClusterResource();
Resource clusterUsage = this.scheduler.getRootQueueMetrics()
.getAllocatedResources();
Resource clusterAvailableResource = Resources.subtract(clusterResource,
clusterUsage);
Resource headroom = policy.getHeadroom(queueFairShare,
queueUsage, clusterAvailableResource);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for " + this.getName() + ":" +
"Min(" +
"(queueFairShare=" + queueFairShare +
" - queueUsage=" + queueUsage + ")," +
" clusterAvailableResource=" + clusterAvailableResource +
"(clusterResource=" + clusterResource +
" - clusterUsage=" + clusterUsage + ")" +
"Headroom=" + headroom);
}
return headroom;
}
public synchronized float getLocalityWaitFactor( public synchronized float getLocalityWaitFactor(
Priority priority, int clusterNodes) { Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks) // Estimate: Required unique resources (i.e. hosts + racks)

View File

@ -175,4 +175,19 @@ public abstract class SchedulingPolicy {
*/ */
public abstract boolean checkIfAMResourceUsageOverLimit( public abstract boolean checkIfAMResourceUsageOverLimit(
Resource usage, Resource maxAMResource); Resource usage, Resource maxAMResource);
/**
* Get headroom by calculating the min of <code>clusterAvailable</code> and
* (<code>queueFairShare</code> - <code>queueUsage</code>) resources that are
* applicable to this policy. For eg if only memory then leave other
* resources such as CPU to same as clusterAvailable.
*
* @param queueFairShare fairshare in the queue
* @param queueUsage resources used in the queue
* @param clusterAvailable available resource in cluster
* @return calculated headroom
*/
public abstract Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource clusterAvailable);
} }

View File

@ -88,6 +88,21 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
return !Resources.fitsIn(usage, maxAMResource); return !Resources.fitsIn(usage, maxAMResource);
} }
@Override
public Resource getHeadroom(Resource queueFairShare, Resource queueUsage,
Resource clusterAvailable) {
int queueAvailableMemory =
Math.max(queueFairShare.getMemory() - queueUsage.getMemory(), 0);
int queueAvailableCPU =
Math.max(queueFairShare.getVirtualCores() - queueUsage
.getVirtualCores(), 0);
Resource headroom = Resources.createResource(
Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
Math.min(clusterAvailable.getVirtualCores(),
queueAvailableCPU));
return headroom;
}
@Override @Override
public void initialize(Resource clusterCapacity) { public void initialize(Resource clusterCapacity) {
comparator.setClusterCapacity(clusterCapacity); comparator.setClusterCapacity(clusterCapacity);

View File

@ -114,6 +114,17 @@ public class FairSharePolicy extends SchedulingPolicy {
return comparator; return comparator;
} }
@Override
public Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource clusterAvailable) {
int queueAvailableMemory = Math.max(
queueFairShare.getMemory() - queueUsage.getMemory(), 0);
Resource headroom = Resources.createResource(
Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
clusterAvailable.getVirtualCores());
return headroom;
}
@Override @Override
public void computeShares(Collection<? extends Schedulable> schedulables, public void computeShares(Collection<? extends Schedulable> schedulables,
Resource totalResources) { Resource totalResources) {

View File

@ -107,6 +107,18 @@ public class FifoPolicy extends SchedulingPolicy {
return usage.getMemory() > maxAMResource.getMemory(); return usage.getMemory() > maxAMResource.getMemory();
} }
@Override
public Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource clusterAvailable) {
int queueAvailableMemory = Math.max(
queueFairShare.getMemory() - queueUsage.getMemory(), 0);
Resource headroom = Resources.createResource(
Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
clusterAvailable.getVirtualCores());
return headroom;
}
@Override @Override
public byte getApplicableDepth() { public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_LEAF; return SchedulingPolicy.DEPTH_LEAF;

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -26,7 +27,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -185,4 +191,61 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel( assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
prio, 10, -1.0, -1.0)); prio, 10, -1.0, -1.0));
} }
@Test
public void testHeadroom() {
final FairScheduler mockScheduler = Mockito.mock(FairScheduler.class);
Mockito.when(mockScheduler.getClock()).thenReturn(scheduler.getClock());
final FSLeafQueue mockQueue = Mockito.mock(FSLeafQueue.class);
final Resource queueFairShare = Resources.createResource(4096, 4);
final Resource queueUsage = Resource.newInstance(1024, 1);
final Resource clusterResource = Resources.createResource(8192, 8);
final Resource clusterUsage = Resources.createResource(6144, 2);
final QueueMetrics fakeRootQueueMetrics = Mockito.mock(QueueMetrics.class);
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
RMContext rmContext = resourceManager.getRMContext();
FSAppAttempt schedulerApp =
new FSAppAttempt(mockScheduler, applicationAttemptId, "user1", mockQueue ,
null, rmContext);
Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare);
Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage);
Mockito.when(mockScheduler.getClusterResource()).thenReturn
(clusterResource);
Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn
(clusterUsage);
Mockito.when(mockScheduler.getRootQueueMetrics()).thenReturn
(fakeRootQueueMetrics);
int minClusterAvailableMemory = 2048;
int minClusterAvailableCPU = 6;
int minQueueAvailableCPU = 3;
// Min of Memory and CPU across cluster and queue is used in
// DominantResourceFairnessPolicy
Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
.getInstance(DominantResourceFairnessPolicy.class));
verifyHeadroom(schedulerApp, minClusterAvailableMemory,
minQueueAvailableCPU);
// Fair and Fifo ignore CPU of queue, so use cluster available CPU
Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
.getInstance(FairSharePolicy.class));
verifyHeadroom(schedulerApp, minClusterAvailableMemory,
minClusterAvailableCPU);
Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
.getInstance(FifoPolicy.class));
verifyHeadroom(schedulerApp, minClusterAvailableMemory,
minClusterAvailableCPU);
}
protected void verifyHeadroom(FSAppAttempt schedulerApp,
int expectedMemory, int expectedCPU) {
Resource headroom = schedulerApp.getHeadroom();
assertEquals(expectedMemory, headroom.getMemory());
assertEquals(expectedCPU, headroom.getVirtualCores());
}
} }