YARN-6194. Cluster capacity in SchedulingPolicy is updated only on allocation file reload. (Yufei Gu via kasha)

(cherry picked from commit b10e962224)
This commit is contained in:
Karthik Kambatla 2017-02-22 15:58:49 -08:00
parent ca7a6a7365
commit 37edbd35f1
7 changed files with 74 additions and 23 deletions

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.api.records.Resource;
/** /**
* Helper class that holds basic information to be passed around * Helper class that holds basic information to be passed around
* FairScheduler classes. Think of this as a glorified map that holds key * FairScheduler classes. Think of this as a glorified map that holds key
@ -27,28 +29,37 @@ public class FSContext {
private boolean preemptionEnabled = false; private boolean preemptionEnabled = false;
private float preemptionUtilizationThreshold; private float preemptionUtilizationThreshold;
private FSStarvedApps starvedApps; private FSStarvedApps starvedApps;
private FairScheduler scheduler;
public boolean isPreemptionEnabled() { FSContext(FairScheduler scheduler) {
this.scheduler = scheduler;
}
boolean isPreemptionEnabled() {
return preemptionEnabled; return preemptionEnabled;
} }
public void setPreemptionEnabled() { void setPreemptionEnabled() {
this.preemptionEnabled = true; this.preemptionEnabled = true;
if (starvedApps == null) { if (starvedApps == null) {
starvedApps = new FSStarvedApps(); starvedApps = new FSStarvedApps();
} }
} }
public FSStarvedApps getStarvedApps() { FSStarvedApps getStarvedApps() {
return starvedApps; return starvedApps;
} }
public float getPreemptionUtilizationThreshold() { float getPreemptionUtilizationThreshold() {
return preemptionUtilizationThreshold; return preemptionUtilizationThreshold;
} }
public void setPreemptionUtilizationThreshold( void setPreemptionUtilizationThreshold(
float preemptionUtilizationThreshold) { float preemptionUtilizationThreshold) {
this.preemptionUtilizationThreshold = preemptionUtilizationThreshold; this.preemptionUtilizationThreshold = preemptionUtilizationThreshold;
} }
public Resource getClusterResource() {
return scheduler.getClusterResource();
}
} }

View File

@ -135,7 +135,7 @@ public abstract class FSQueue implements Queue, Schedulable {
} }
public void setPolicy(SchedulingPolicy policy) { public void setPolicy(SchedulingPolicy policy) {
policy.initialize(scheduler.getClusterResource()); policy.initialize(scheduler.getContext());
this.policy = policy; this.policy = policy;
} }

View File

@ -206,13 +206,12 @@ public class FairScheduler extends
public FairScheduler() { public FairScheduler() {
super(FairScheduler.class.getName()); super(FairScheduler.class.getName());
context = new FSContext(); context = new FSContext(this);
allocsLoader = new AllocationFileLoaderService(); allocsLoader = new AllocationFileLoaderService();
queueMgr = new QueueManager(this); queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
} }
@VisibleForTesting
public FSContext getContext() { public FSContext getContext() {
return context; return context;
} }
@ -1450,8 +1449,7 @@ public class FairScheduler extends
} else { } else {
allocConf = queueInfo; allocConf = queueInfo;
setQueueAcls(allocConf.getQueueAcls()); setQueueAcls(allocConf.getQueueAcls());
allocConf.getDefaultSchedulingPolicy().initialize( allocConf.getDefaultSchedulingPolicy().initialize(getContext());
getClusterResource());
queueMgr.updateAllocationConfiguration(allocConf); queueMgr.updateAllocationConfiguration(allocConf);
applyChildDefaults(); applyChildDefaults();
maxRunningEnforcer.updateRunnabilityOnReload(); maxRunningEnforcer.updateRunnabilityOnReload();

View File

@ -91,9 +91,26 @@ public abstract class SchedulingPolicy {
} }
return getInstance(clazz); return getInstance(clazz);
} }
/**
* Initialize the scheduling policy with cluster resources.
* @deprecated Since it doesn't track cluster resource changes, replaced by
* {@link #initialize(FSContext)}.
*
* @param clusterCapacity cluster resources
*/
@Deprecated
public void initialize(Resource clusterCapacity) {} public void initialize(Resource clusterCapacity) {}
/**
* Initialize the scheduling policy with a {@link FSContext} object, which has
* a pointer to the cluster resources among other information.
*
* @param fsContext a {@link FSContext} object which has a pointer to the
* cluster resources
*/
public void initialize(FSContext fsContext) {}
/** /**
* The {@link ResourceCalculator} returned by this method should be used * The {@link ResourceCalculator} returned by this method should be used
* for any calculations involving resources. * for any calculations involving resources.

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
@ -104,17 +105,17 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
} }
@Override @Override
public void initialize(Resource clusterCapacity) { public void initialize(FSContext fsContext) {
COMPARATOR.setClusterCapacity(clusterCapacity); COMPARATOR.setFSContext(fsContext);
} }
public static class DominantResourceFairnessComparator implements Comparator<Schedulable> { public static class DominantResourceFairnessComparator implements Comparator<Schedulable> {
private static final int NUM_RESOURCES = ResourceType.values().length; private static final int NUM_RESOURCES = ResourceType.values().length;
private Resource clusterCapacity;
public void setClusterCapacity(Resource clusterCapacity) { private FSContext fsContext;
this.clusterCapacity = clusterCapacity;
public void setFSContext(FSContext fsContext) {
this.fsContext = fsContext;
} }
@Override @Override
@ -125,7 +126,8 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
ResourceWeights sharesOfMinShare2 = new ResourceWeights(); ResourceWeights sharesOfMinShare2 = new ResourceWeights();
ResourceType[] resourceOrder1 = new ResourceType[NUM_RESOURCES]; ResourceType[] resourceOrder1 = new ResourceType[NUM_RESOURCES];
ResourceType[] resourceOrder2 = new ResourceType[NUM_RESOURCES]; ResourceType[] resourceOrder2 = new ResourceType[NUM_RESOURCES];
Resource clusterCapacity = fsContext.getClusterResource();
// Calculate shares of the cluster for each resource both schedulables. // Calculate shares of the cluster for each resource both schedulables.
calculateShares(s1.getResourceUsage(), calculateShares(s1.getResourceUsage(),
clusterCapacity, sharesOfCluster1, resourceOrder1, s1.getWeights()); clusterCapacity, sharesOfCluster1, resourceOrder1, s1.getWeights());

View File

@ -3285,7 +3285,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterResource()); drfPolicy.initialize(scheduler.getContext());
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.update(); scheduler.update();
@ -3331,7 +3331,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterResource()); drfPolicy.initialize(scheduler.getContext());
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.update(); scheduler.update();
@ -3346,7 +3346,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.handle(updateEvent); scheduler.handle(updateEvent);
Assert.assertEquals(1, app2.getLiveContainers().size()); Assert.assertEquals(1, app2.getLiveContainers().size());
} }
@Test @Test
public void testDRFHierarchicalQueues() throws Exception { public void testDRFHierarchicalQueues() throws Exception {
scheduler.init(conf); scheduler.init(conf);
@ -3376,7 +3376,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
FSAppAttempt app4 = scheduler.getSchedulerApp(appAttId4); FSAppAttempt app4 = scheduler.getSchedulerApp(appAttId4);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterResource()); drfPolicy.initialize(scheduler.getContext());
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy);

View File

@ -19,12 +19,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Comparator; import java.util.Comparator;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FakeSchedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FakeSchedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -40,7 +43,10 @@ public class TestDominantResourceFairnessPolicy {
private Comparator<Schedulable> createComparator(int clusterMem, private Comparator<Schedulable> createComparator(int clusterMem,
int clusterCpu) { int clusterCpu) {
DominantResourceFairnessPolicy policy = new DominantResourceFairnessPolicy(); DominantResourceFairnessPolicy policy = new DominantResourceFairnessPolicy();
policy.initialize(BuilderUtils.newResource(clusterMem, clusterCpu)); FSContext fsContext = mock(FSContext.class);
when(fsContext.getClusterResource()).
thenReturn(Resources.createResource(clusterMem, clusterCpu));
policy.initialize(fsContext);
return policy.getComparator(); return policy.getComparator();
} }
@ -160,4 +166,21 @@ public class TestDominantResourceFairnessPolicy {
assertEquals(ResourceType.CPU, resourceOrder[0]); assertEquals(ResourceType.CPU, resourceOrder[0]);
assertEquals(ResourceType.MEMORY, resourceOrder[1]); assertEquals(ResourceType.MEMORY, resourceOrder[1]);
} }
@Test
public void testCompareSchedulablesWithClusterResourceChanges(){
Schedulable schedulable1 = createSchedulable(2000, 1);
Schedulable schedulable2 = createSchedulable(1000, 2);
// schedulable1 has share weights [1/2, 1/5], schedulable2 has share
// weights [1/4, 2/5], schedulable1 > schedulable2 since 1/2 > 2/5
assertTrue(createComparator(4000, 5)
.compare(schedulable1, schedulable2) > 0);
// share weights have changed because of the cluster resource change.
// schedulable1 has share weights [1/4, 1/6], schedulable2 has share
// weights [1/8, 1/3], schedulable1 < schedulable2 since 1/4 < 1/3
assertTrue(createComparator(8000, 6)
.compare(schedulable1, schedulable2) < 0);
}
} }