YARN-3485. FairScheduler headroom calculation doesn't consider maxResources for Fifo and FairShare policies. (kasha)

(cherry picked from commit 8f82970e0c)
(cherry picked from commit 4cc38df7de)
This commit is contained in:
Karthik Kambatla 2015-04-28 21:00:35 -07:00
parent 4b9147ef8c
commit 396c41b1d4
7 changed files with 76 additions and 35 deletions

View File

@ -43,6 +43,9 @@ Release 2.7.1 - UNRELEASED
YARN-3464. Race condition in LocalizerRunner kills localizer before YARN-3464. Race condition in LocalizerRunner kills localizer before
localizing all resources. (Zhihai Xu via kasha) localizing all resources. (Zhihai Xu via kasha)
YARN-3485. FairScheduler headroom calculation doesn't consider
maxResources for Fifo and FairShare policies. (kasha)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20

View File

@ -171,6 +171,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
+ priority + "; currentReservation " + currentReservation); + priority + "; currentReservation " + currentReservation);
} }
/**
* Headroom depends on resources in the cluster, current usage of the
* queue, queue's fair-share and queue's max-resources.
*/
@Override @Override
public Resource getHeadroom() { public Resource getHeadroom() {
final FSQueue queue = (FSQueue) this.queue; final FSQueue queue = (FSQueue) this.queue;
@ -181,18 +185,22 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Resource clusterResource = this.scheduler.getClusterResource(); Resource clusterResource = this.scheduler.getClusterResource();
Resource clusterUsage = this.scheduler.getRootQueueMetrics() Resource clusterUsage = this.scheduler.getRootQueueMetrics()
.getAllocatedResources(); .getAllocatedResources();
Resource clusterAvailableResource = Resources.subtract(clusterResource,
clusterUsage); Resource clusterAvailableResources =
Resources.subtract(clusterResource, clusterUsage);
Resource queueMaxAvailableResources =
Resources.subtract(queue.getMaxShare(), queueUsage);
Resource maxAvailableResource = Resources.componentwiseMin(
clusterAvailableResources, queueMaxAvailableResources);
Resource headroom = policy.getHeadroom(queueFairShare, Resource headroom = policy.getHeadroom(queueFairShare,
queueUsage, clusterAvailableResource); queueUsage, maxAvailableResource);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for " + this.getName() + ":" + LOG.debug("Headroom calculation for " + this.getName() + ":" +
"Min(" + "Min(" +
"(queueFairShare=" + queueFairShare + "(queueFairShare=" + queueFairShare +
" - queueUsage=" + queueUsage + ")," + " - queueUsage=" + queueUsage + ")," +
" clusterAvailableResource=" + clusterAvailableResource + " maxAvailableResource=" + maxAvailableResource +
"(clusterResource=" + clusterResource +
" - clusterUsage=" + clusterUsage + ")" +
"Headroom=" + headroom); "Headroom=" + headroom);
} }
return headroom; return headroom;

View File

@ -185,10 +185,10 @@ public abstract class SchedulingPolicy {
* *
* @param queueFairShare fairshare in the queue * @param queueFairShare fairshare in the queue
* @param queueUsage resources used in the queue * @param queueUsage resources used in the queue
* @param clusterAvailable available resource in cluster * @param maxAvailable available resource in cluster for this queue
* @return calculated headroom * @return calculated headroom
*/ */
public abstract Resource getHeadroom(Resource queueFairShare, public abstract Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource clusterAvailable); Resource queueUsage, Resource maxAvailable);
} }

View File

@ -90,15 +90,15 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
@Override @Override
public Resource getHeadroom(Resource queueFairShare, Resource queueUsage, public Resource getHeadroom(Resource queueFairShare, Resource queueUsage,
Resource clusterAvailable) { Resource maxAvailable) {
int queueAvailableMemory = int queueAvailableMemory =
Math.max(queueFairShare.getMemory() - queueUsage.getMemory(), 0); Math.max(queueFairShare.getMemory() - queueUsage.getMemory(), 0);
int queueAvailableCPU = int queueAvailableCPU =
Math.max(queueFairShare.getVirtualCores() - queueUsage Math.max(queueFairShare.getVirtualCores() - queueUsage
.getVirtualCores(), 0); .getVirtualCores(), 0);
Resource headroom = Resources.createResource( Resource headroom = Resources.createResource(
Math.min(clusterAvailable.getMemory(), queueAvailableMemory), Math.min(maxAvailable.getMemory(), queueAvailableMemory),
Math.min(clusterAvailable.getVirtualCores(), Math.min(maxAvailable.getVirtualCores(),
queueAvailableCPU)); queueAvailableCPU));
return headroom; return headroom;
} }

View File

@ -116,12 +116,12 @@ public class FairSharePolicy extends SchedulingPolicy {
@Override @Override
public Resource getHeadroom(Resource queueFairShare, public Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource clusterAvailable) { Resource queueUsage, Resource maxAvailable) {
int queueAvailableMemory = Math.max( int queueAvailableMemory = Math.max(
queueFairShare.getMemory() - queueUsage.getMemory(), 0); queueFairShare.getMemory() - queueUsage.getMemory(), 0);
Resource headroom = Resources.createResource( Resource headroom = Resources.createResource(
Math.min(clusterAvailable.getMemory(), queueAvailableMemory), Math.min(maxAvailable.getMemory(), queueAvailableMemory),
clusterAvailable.getVirtualCores()); maxAvailable.getVirtualCores());
return headroom; return headroom;
} }

View File

@ -109,12 +109,12 @@ public class FifoPolicy extends SchedulingPolicy {
@Override @Override
public Resource getHeadroom(Resource queueFairShare, public Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource clusterAvailable) { Resource queueUsage, Resource maxAvailable) {
int queueAvailableMemory = Math.max( int queueAvailableMemory = Math.max(
queueFairShare.getMemory() - queueUsage.getMemory(), 0); queueFairShare.getMemory() - queueUsage.getMemory(), 0);
Resource headroom = Resources.createResource( Resource headroom = Resources.createResource(
Math.min(clusterAvailable.getMemory(), queueAvailableMemory), Math.min(maxAvailable.getMemory(), queueAvailableMemory),
clusterAvailable.getVirtualCores()); maxAvailable.getVirtualCores());
return headroom; return headroom;
} }

View File

@ -198,18 +198,24 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
Mockito.when(mockScheduler.getClock()).thenReturn(scheduler.getClock()); Mockito.when(mockScheduler.getClock()).thenReturn(scheduler.getClock());
final FSLeafQueue mockQueue = Mockito.mock(FSLeafQueue.class); final FSLeafQueue mockQueue = Mockito.mock(FSLeafQueue.class);
final Resource queueFairShare = Resources.createResource(4096, 4);
final Resource queueUsage = Resource.newInstance(1024, 1); final Resource queueMaxResources = Resource.newInstance(5 * 1024, 3);
final Resource queueFairShare = Resources.createResource(4096, 2);
final Resource queueUsage = Resource.newInstance(2048, 2);
final Resource queueStarvation =
Resources.subtract(queueFairShare, queueUsage);
final Resource queueMaxResourcesAvailable =
Resources.subtract(queueMaxResources, queueUsage);
final Resource clusterResource = Resources.createResource(8192, 8); final Resource clusterResource = Resources.createResource(8192, 8);
final Resource clusterUsage = Resources.createResource(6144, 2); final Resource clusterUsage = Resources.createResource(2048, 2);
final Resource clusterAvailable =
Resources.subtract(clusterResource, clusterUsage);
final QueueMetrics fakeRootQueueMetrics = Mockito.mock(QueueMetrics.class); final QueueMetrics fakeRootQueueMetrics = Mockito.mock(QueueMetrics.class);
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); Mockito.when(mockQueue.getMaxShare()).thenReturn(queueMaxResources);
RMContext rmContext = resourceManager.getRMContext();
FSAppAttempt schedulerApp =
new FSAppAttempt(mockScheduler, applicationAttemptId, "user1", mockQueue ,
null, rmContext);
Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare); Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare);
Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage); Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage);
Mockito.when(mockScheduler.getClusterResource()).thenReturn Mockito.when(mockScheduler.getClusterResource()).thenReturn
@ -219,27 +225,51 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
Mockito.when(mockScheduler.getRootQueueMetrics()).thenReturn Mockito.when(mockScheduler.getRootQueueMetrics()).thenReturn
(fakeRootQueueMetrics); (fakeRootQueueMetrics);
int minClusterAvailableMemory = 2048; ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
int minClusterAvailableCPU = 6; RMContext rmContext = resourceManager.getRMContext();
int minQueueAvailableCPU = 3; FSAppAttempt schedulerApp =
new FSAppAttempt(mockScheduler, applicationAttemptId, "user1", mockQueue ,
null, rmContext);
// Min of Memory and CPU across cluster and queue is used in // Min of Memory and CPU across cluster and queue is used in
// DominantResourceFairnessPolicy // DominantResourceFairnessPolicy
Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
.getInstance(DominantResourceFairnessPolicy.class)); .getInstance(DominantResourceFairnessPolicy.class));
verifyHeadroom(schedulerApp, minClusterAvailableMemory, verifyHeadroom(schedulerApp,
minQueueAvailableCPU); min(queueStarvation.getMemory(),
clusterAvailable.getMemory(),
queueMaxResourcesAvailable.getMemory()),
min(queueStarvation.getVirtualCores(),
clusterAvailable.getVirtualCores(),
queueMaxResourcesAvailable.getVirtualCores())
);
// Fair and Fifo ignore CPU of queue, so use cluster available CPU // Fair and Fifo ignore CPU of queue, so use cluster available CPU
Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
.getInstance(FairSharePolicy.class)); .getInstance(FairSharePolicy.class));
verifyHeadroom(schedulerApp, minClusterAvailableMemory, verifyHeadroom(schedulerApp,
minClusterAvailableCPU); min(queueStarvation.getMemory(),
clusterAvailable.getMemory(),
queueMaxResourcesAvailable.getMemory()),
Math.min(
clusterAvailable.getVirtualCores(),
queueMaxResourcesAvailable.getVirtualCores())
);
Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
.getInstance(FifoPolicy.class)); .getInstance(FifoPolicy.class));
verifyHeadroom(schedulerApp, minClusterAvailableMemory, verifyHeadroom(schedulerApp,
minClusterAvailableCPU); min(queueStarvation.getMemory(),
clusterAvailable.getMemory(),
queueMaxResourcesAvailable.getMemory()),
Math.min(
clusterAvailable.getVirtualCores(),
queueMaxResourcesAvailable.getVirtualCores())
);
}
private static int min(int value1, int value2, int value3) {
return Math.min(Math.min(value1, value2), value3);
} }
protected void verifyHeadroom(FSAppAttempt schedulerApp, protected void verifyHeadroom(FSAppAttempt schedulerApp,