YARN-6194. Cluster capacity in SchedulingPolicy is updated only on allocation file reload. (Yufei Gu via kasha)

This commit is contained in:
Karthik Kambatla 2017-02-22 15:58:49 -08:00
parent 718ad9f6ee
commit b10e962224
7 changed files with 74 additions and 23 deletions

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* Helper class that holds basic information to be passed around
* FairScheduler classes. Think of this as a glorified map that holds key
@ -27,28 +29,37 @@ public class FSContext {
private boolean preemptionEnabled = false;
private float preemptionUtilizationThreshold;
private FSStarvedApps starvedApps;
private FairScheduler scheduler;
public boolean isPreemptionEnabled() {
FSContext(FairScheduler scheduler) {
this.scheduler = scheduler;
}
boolean isPreemptionEnabled() {
return preemptionEnabled;
}
public void setPreemptionEnabled() {
void setPreemptionEnabled() {
this.preemptionEnabled = true;
if (starvedApps == null) {
starvedApps = new FSStarvedApps();
}
}
public FSStarvedApps getStarvedApps() {
FSStarvedApps getStarvedApps() {
return starvedApps;
}
public float getPreemptionUtilizationThreshold() {
float getPreemptionUtilizationThreshold() {
return preemptionUtilizationThreshold;
}
public void setPreemptionUtilizationThreshold(
void setPreemptionUtilizationThreshold(
float preemptionUtilizationThreshold) {
this.preemptionUtilizationThreshold = preemptionUtilizationThreshold;
}
public Resource getClusterResource() {
return scheduler.getClusterResource();
}
}

View File

@ -135,7 +135,7 @@ public FSParentQueue getParent() {
}
public void setPolicy(SchedulingPolicy policy) {
policy.initialize(scheduler.getClusterResource());
policy.initialize(scheduler.getContext());
this.policy = policy;
}

View File

@ -205,13 +205,12 @@ public class FairScheduler extends
public FairScheduler() {
super(FairScheduler.class.getName());
context = new FSContext();
context = new FSContext(this);
allocsLoader = new AllocationFileLoaderService();
queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
}
@VisibleForTesting
public FSContext getContext() {
return context;
}
@ -1452,8 +1451,7 @@ public void onReload(AllocationConfiguration queueInfo)
} else {
allocConf = queueInfo;
setQueueAcls(allocConf.getQueueAcls());
allocConf.getDefaultSchedulingPolicy().initialize(
getClusterResource());
allocConf.getDefaultSchedulingPolicy().initialize(getContext());
queueMgr.updateAllocationConfiguration(allocConf);
applyChildDefaults();
maxRunningEnforcer.updateRunnabilityOnReload();

View File

@ -91,9 +91,26 @@ public static SchedulingPolicy parse(String policy)
}
return getInstance(clazz);
}
/**
* Initialize the scheduling policy with cluster resources.
* @deprecated Since it doesn't track cluster resource changes, replaced by
* {@link #initialize(FSContext)}.
*
* @param clusterCapacity cluster resources
*/
@Deprecated
public void initialize(Resource clusterCapacity) {}
/**
* Initialize the scheduling policy with a {@link FSContext} object, which has
* a pointer to the cluster resources among other information.
*
* @param fsContext a {@link FSContext} object which has a pointer to the
* cluster resources
*/
public void initialize(FSContext fsContext) {}
/**
* The {@link ResourceCalculator} returned by this method should be used
* for any calculations involving resources.

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
@ -104,17 +105,17 @@ public Resource getHeadroom(Resource queueFairShare, Resource queueUsage,
}
@Override
public void initialize(Resource clusterCapacity) {
COMPARATOR.setClusterCapacity(clusterCapacity);
public void initialize(FSContext fsContext) {
COMPARATOR.setFSContext(fsContext);
}
public static class DominantResourceFairnessComparator implements Comparator<Schedulable> {
private static final int NUM_RESOURCES = ResourceType.values().length;
private Resource clusterCapacity;
public void setClusterCapacity(Resource clusterCapacity) {
this.clusterCapacity = clusterCapacity;
private FSContext fsContext;
public void setFSContext(FSContext fsContext) {
this.fsContext = fsContext;
}
@Override
@ -125,7 +126,8 @@ public int compare(Schedulable s1, Schedulable s2) {
ResourceWeights sharesOfMinShare2 = new ResourceWeights();
ResourceType[] resourceOrder1 = new ResourceType[NUM_RESOURCES];
ResourceType[] resourceOrder2 = new ResourceType[NUM_RESOURCES];
Resource clusterCapacity = fsContext.getClusterResource();
// Calculate shares of the cluster for each resource both schedulables.
calculateShares(s1.getResourceUsage(),
clusterCapacity, sharesOfCluster1, resourceOrder1, s1.getWeights());

View File

@ -3293,7 +3293,7 @@ public void testBasicDRFAssignment() throws Exception {
FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterResource());
drfPolicy.initialize(scheduler.getContext());
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.update();
@ -3339,7 +3339,7 @@ public void testBasicDRFWithQueues() throws Exception {
FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterResource());
drfPolicy.initialize(scheduler.getContext());
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.update();
@ -3354,7 +3354,7 @@ public void testBasicDRFWithQueues() throws Exception {
scheduler.handle(updateEvent);
Assert.assertEquals(1, app2.getLiveContainers().size());
}
@Test
public void testDRFHierarchicalQueues() throws Exception {
scheduler.init(conf);
@ -3384,7 +3384,7 @@ public void testDRFHierarchicalQueues() throws Exception {
FSAppAttempt app4 = scheduler.getSchedulerApp(appAttId4);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterResource());
drfPolicy.initialize(scheduler.getContext());
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy);

View File

@ -19,12 +19,15 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Comparator;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FakeSchedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -40,7 +43,10 @@ public class TestDominantResourceFairnessPolicy {
private Comparator<Schedulable> createComparator(int clusterMem,
int clusterCpu) {
DominantResourceFairnessPolicy policy = new DominantResourceFairnessPolicy();
policy.initialize(BuilderUtils.newResource(clusterMem, clusterCpu));
FSContext fsContext = mock(FSContext.class);
when(fsContext.getClusterResource()).
thenReturn(Resources.createResource(clusterMem, clusterCpu));
policy.initialize(fsContext);
return policy.getComparator();
}
@ -160,4 +166,21 @@ public void testCalculateShares() {
assertEquals(ResourceType.CPU, resourceOrder[0]);
assertEquals(ResourceType.MEMORY, resourceOrder[1]);
}
@Test
public void testCompareSchedulablesWithClusterResourceChanges(){
Schedulable schedulable1 = createSchedulable(2000, 1);
Schedulable schedulable2 = createSchedulable(1000, 2);
// schedulable1 has share weights [1/2, 1/5], schedulable2 has share
// weights [1/4, 2/5], schedulable1 > schedulable2 since 1/2 > 2/5
assertTrue(createComparator(4000, 5)
.compare(schedulable1, schedulable2) > 0);
// share weights have changed because of the cluster resource change.
// schedulable1 has share weights [1/4, 1/6], schedulable2 has share
// weights [1/8, 1/3], schedulable1 < schedulable2 since 1/4 < 1/3
assertTrue(createComparator(8000, 6)
.compare(schedulable1, schedulable2) < 0);
}
}