YARN-496. Fair scheduler configs are refreshed inconsistently in reinitialize. Contributed by Sandy Ryza

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1461667 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2013-03-27 15:51:20 +00:00
parent 81299911b4
commit f26fa51029
4 changed files with 34 additions and 22 deletions

View File

@ -88,6 +88,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-498. Unmanaged AM launcher does not set various constants in env for YARN-498. Unmanaged AM launcher does not set various constants in env for
an AM, also does not handle failed AMs properly. (Hitesh Shah via bikas) an AM, also does not handle failed AMs properly. (Hitesh Shah via bikas)
YARN-496. Fair scheduler configs are refreshed inconsistently in
reinitialize. (Sandy Ryza via tomwhite)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -917,25 +917,25 @@ public class FairScheduler implements ResourceScheduler {
@Override @Override
public synchronized void reinitialize(Configuration conf, RMContext rmContext) public synchronized void reinitialize(Configuration conf, RMContext rmContext)
throws IOException { throws IOException {
this.conf = new FairSchedulerConfiguration(conf);
minimumAllocation = this.conf.getMinimumMemoryAllocation();
maximumAllocation = this.conf.getMaximumMemoryAllocation();
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
preemptionEnabled = this.conf.getPreemptionEnabled();
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
sizeBasedWeight = this.conf.getSizeBasedWeight();
if (!initialized) { if (!initialized) {
this.conf = new FairSchedulerConfiguration(conf);
rootMetrics = QueueMetrics.forQueue("root", null, true, conf); rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext; this.rmContext = rmContext;
this.eventLog = new FairSchedulerEventLog(); this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf); eventLog.init(this.conf);
minimumAllocation = this.conf.getMinimumMemoryAllocation();
maximumAllocation = this.conf.getMaximumMemoryAllocation();
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
preemptionEnabled = this.conf.getPreemptionEnabled();
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
initialized = true; initialized = true;
sizeBasedWeight = this.conf.getSizeBasedWeight();
try { try {
queueMgr.initialize(); queueMgr.initialize();
} catch (Exception e) { } catch (Exception e) {
@ -947,14 +947,8 @@ public class FairScheduler implements ResourceScheduler {
updateThread.setDaemon(true); updateThread.setDaemon(true);
updateThread.start(); updateThread.start();
} else { } else {
this.conf = new FairSchedulerConfiguration(conf);
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
preemptionEnabled = this.conf.getPreemptionEnabled();
try { try {
queueMgr.reloadAllocs(); queueMgr.reloadAllocs();
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Failed to initialize FairScheduler", e); throw new IOException("Failed to initialize FairScheduler", e);
} }

View File

@ -37,7 +37,6 @@ public class FairSchedulerConfiguration extends Configuration {
protected static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue"; protected static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true; protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true;
protected static final String LOCALITY_THRESHOLD = CONF_PREFIX + "locality.threshold";
protected static final float DEFAULT_LOCALITY_THRESHOLD = -1.0f; protected static final float DEFAULT_LOCALITY_THRESHOLD = -1.0f;
/** Cluster threshold for node locality. */ /** Cluster threshold for node locality. */
@ -89,10 +88,6 @@ public class FairSchedulerConfiguration extends Configuration {
return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE); return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE);
} }
public float getLocalityThreshold() {
return getFloat(LOCALITY_THRESHOLD, DEFAULT_LOCALITY_THRESHOLD);
}
public float getLocalityThresholdNode() { public float getLocalityThresholdNode() {
return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE); return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
} }

View File

@ -175,6 +175,26 @@ public class TestFairScheduler {
// TESTS // TESTS
@Test(timeout=2000)
public void testLoadConfigurationOnInitialize() throws IOException {
Configuration conf = createConfiguration();
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
conf.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 3);
conf.setBoolean(FairSchedulerConfiguration.SIZE_BASED_WEIGHT, true);
conf.setFloat(FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE, .5f);
conf.setFloat(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7f);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
scheduler.reinitialize(conf, resourceManager.getRMContext());
Assert.assertEquals(true, scheduler.assignMultiple);
Assert.assertEquals(3, scheduler.maxAssign);
Assert.assertEquals(true, scheduler.sizeBasedWeight);
Assert.assertEquals(.5, scheduler.nodeLocalityThreshold, .01);
Assert.assertEquals(.7, scheduler.rackLocalityThreshold, .01);
Assert.assertEquals(1024, scheduler.getMaximumResourceCapability().getMemory());
Assert.assertEquals(512, scheduler.getMinimumResourceCapability().getMemory());
}
@Test @Test
public void testAggregateCapacityTracking() throws Exception { public void testAggregateCapacityTracking() throws Exception {
// Add a node // Add a node