YARN-496. Fair scheduler configs are refreshed inconsistently in reinitialize. Contributed by Sandy Ryza
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1461614 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c9f5052803
commit
414458ff9d
|
@ -146,6 +146,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
YARN-474. Fix CapacityScheduler to trigger application-activation when
|
YARN-474. Fix CapacityScheduler to trigger application-activation when
|
||||||
am-resource-percent configuration is refreshed. (Zhijie Shen via vinodkv)
|
am-resource-percent configuration is refreshed. (Zhijie Shen via vinodkv)
|
||||||
|
|
||||||
|
YARN-496. Fair scheduler configs are refreshed inconsistently in
|
||||||
|
reinitialize. (Sandy Ryza via tomwhite)
|
||||||
|
|
||||||
Release 2.0.4-alpha - UNRELEASED
|
Release 2.0.4-alpha - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -921,25 +921,25 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
|
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
this.conf = new FairSchedulerConfiguration(conf);
|
||||||
|
minimumAllocation = this.conf.getMinimumMemoryAllocation();
|
||||||
|
maximumAllocation = this.conf.getMaximumMemoryAllocation();
|
||||||
|
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
|
||||||
|
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
|
||||||
|
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
|
||||||
|
preemptionEnabled = this.conf.getPreemptionEnabled();
|
||||||
|
assignMultiple = this.conf.getAssignMultiple();
|
||||||
|
maxAssign = this.conf.getMaxAssign();
|
||||||
|
sizeBasedWeight = this.conf.getSizeBasedWeight();
|
||||||
|
|
||||||
if (!initialized) {
|
if (!initialized) {
|
||||||
this.conf = new FairSchedulerConfiguration(conf);
|
|
||||||
rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
|
rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
|
||||||
this.rmContext = rmContext;
|
this.rmContext = rmContext;
|
||||||
this.eventLog = new FairSchedulerEventLog();
|
this.eventLog = new FairSchedulerEventLog();
|
||||||
eventLog.init(this.conf);
|
eventLog.init(this.conf);
|
||||||
minimumAllocation = this.conf.getMinimumMemoryAllocation();
|
|
||||||
maximumAllocation = this.conf.getMaximumMemoryAllocation();
|
|
||||||
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
|
|
||||||
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
|
|
||||||
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
|
|
||||||
preemptionEnabled = this.conf.getPreemptionEnabled();
|
|
||||||
assignMultiple = this.conf.getAssignMultiple();
|
|
||||||
maxAssign = this.conf.getMaxAssign();
|
|
||||||
|
|
||||||
initialized = true;
|
initialized = true;
|
||||||
|
|
||||||
sizeBasedWeight = this.conf.getSizeBasedWeight();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
queueMgr.initialize();
|
queueMgr.initialize();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -951,14 +951,8 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
updateThread.setDaemon(true);
|
updateThread.setDaemon(true);
|
||||||
updateThread.start();
|
updateThread.start();
|
||||||
} else {
|
} else {
|
||||||
this.conf = new FairSchedulerConfiguration(conf);
|
|
||||||
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
|
|
||||||
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
|
|
||||||
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
|
|
||||||
preemptionEnabled = this.conf.getPreemptionEnabled();
|
|
||||||
try {
|
try {
|
||||||
queueMgr.reloadAllocs();
|
queueMgr.reloadAllocs();
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IOException("Failed to initialize FairScheduler", e);
|
throw new IOException("Failed to initialize FairScheduler", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,6 @@ public class FairSchedulerConfiguration extends Configuration {
|
||||||
protected static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
|
protected static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
|
||||||
protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true;
|
protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true;
|
||||||
|
|
||||||
protected static final String LOCALITY_THRESHOLD = CONF_PREFIX + "locality.threshold";
|
|
||||||
protected static final float DEFAULT_LOCALITY_THRESHOLD = -1.0f;
|
protected static final float DEFAULT_LOCALITY_THRESHOLD = -1.0f;
|
||||||
|
|
||||||
/** Cluster threshold for node locality. */
|
/** Cluster threshold for node locality. */
|
||||||
|
@ -89,10 +88,6 @@ public class FairSchedulerConfiguration extends Configuration {
|
||||||
return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE);
|
return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public float getLocalityThreshold() {
|
|
||||||
return getFloat(LOCALITY_THRESHOLD, DEFAULT_LOCALITY_THRESHOLD);
|
|
||||||
}
|
|
||||||
|
|
||||||
public float getLocalityThresholdNode() {
|
public float getLocalityThresholdNode() {
|
||||||
return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
|
return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -184,6 +184,26 @@ public class TestFairScheduler {
|
||||||
|
|
||||||
// TESTS
|
// TESTS
|
||||||
|
|
||||||
|
@Test(timeout=2000)
|
||||||
|
public void testLoadConfigurationOnInitialize() throws IOException {
|
||||||
|
Configuration conf = createConfiguration();
|
||||||
|
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
|
||||||
|
conf.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 3);
|
||||||
|
conf.setBoolean(FairSchedulerConfiguration.SIZE_BASED_WEIGHT, true);
|
||||||
|
conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE, .5);
|
||||||
|
conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7);
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
Assert.assertEquals(true, scheduler.assignMultiple);
|
||||||
|
Assert.assertEquals(3, scheduler.maxAssign);
|
||||||
|
Assert.assertEquals(true, scheduler.sizeBasedWeight);
|
||||||
|
Assert.assertEquals(.5, scheduler.nodeLocalityThreshold, .01);
|
||||||
|
Assert.assertEquals(.7, scheduler.rackLocalityThreshold, .01);
|
||||||
|
Assert.assertEquals(1024, scheduler.getMaximumResourceCapability().getMemory());
|
||||||
|
Assert.assertEquals(512, scheduler.getMinimumResourceCapability().getMemory());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAggregateCapacityTracking() throws Exception {
|
public void testAggregateCapacityTracking() throws Exception {
|
||||||
// Add a node
|
// Add a node
|
||||||
|
|
Loading…
Reference in New Issue