YARN-2608. FairScheduler: Potential deadlocks in loading alloc files and clock access. (Wei Yan via kasha)
(cherry picked from commit c9811af09a3d3f9f2f1b86fc9d6f2763d3225e44)
This commit is contained in:
parent
9ad9e51cd9
commit
b923c291b4
|
@ -420,6 +420,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2523. ResourceManager UI showing negative value for "Decommissioned
|
YARN-2523. ResourceManager UI showing negative value for "Decommissioned
|
||||||
Nodes" field (Rohith via jlowe)
|
Nodes" field (Rohith via jlowe)
|
||||||
|
|
||||||
|
YARN-2608. FairScheduler: Potential deadlocks in loading alloc files and
|
||||||
|
clock access. (Wei Yan via kasha)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -117,7 +117,7 @@ public class FairScheduler extends
|
||||||
|
|
||||||
private Resource incrAllocation;
|
private Resource incrAllocation;
|
||||||
private QueueManager queueMgr;
|
private QueueManager queueMgr;
|
||||||
private Clock clock;
|
private volatile Clock clock;
|
||||||
private boolean usePortForNodeName;
|
private boolean usePortForNodeName;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
||||||
|
@ -555,11 +555,12 @@ public class FairScheduler extends
|
||||||
return continuousSchedulingSleepMs;
|
return continuousSchedulingSleepMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized Clock getClock() {
|
public Clock getClock() {
|
||||||
return clock;
|
return clock;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized void setClock(Clock clock) {
|
@VisibleForTesting
|
||||||
|
void setClock(Clock clock) {
|
||||||
this.clock = clock;
|
this.clock = clock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1204,64 +1205,65 @@ public class FairScheduler extends
|
||||||
this.rmContext = rmContext;
|
this.rmContext = rmContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void initScheduler(Configuration conf)
|
private void initScheduler(Configuration conf) throws IOException {
|
||||||
throws IOException {
|
synchronized (this) {
|
||||||
this.conf = new FairSchedulerConfiguration(conf);
|
this.conf = new FairSchedulerConfiguration(conf);
|
||||||
validateConf(this.conf);
|
validateConf(this.conf);
|
||||||
minimumAllocation = this.conf.getMinimumAllocation();
|
minimumAllocation = this.conf.getMinimumAllocation();
|
||||||
maximumAllocation = this.conf.getMaximumAllocation();
|
maximumAllocation = this.conf.getMaximumAllocation();
|
||||||
incrAllocation = this.conf.getIncrementAllocation();
|
incrAllocation = this.conf.getIncrementAllocation();
|
||||||
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
|
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
|
||||||
continuousSchedulingSleepMs =
|
continuousSchedulingSleepMs =
|
||||||
this.conf.getContinuousSchedulingSleepMs();
|
this.conf.getContinuousSchedulingSleepMs();
|
||||||
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
|
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
|
||||||
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
|
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
|
||||||
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
|
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
|
||||||
rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
|
rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
|
||||||
preemptionEnabled = this.conf.getPreemptionEnabled();
|
preemptionEnabled = this.conf.getPreemptionEnabled();
|
||||||
preemptionUtilizationThreshold =
|
preemptionUtilizationThreshold =
|
||||||
this.conf.getPreemptionUtilizationThreshold();
|
this.conf.getPreemptionUtilizationThreshold();
|
||||||
assignMultiple = this.conf.getAssignMultiple();
|
assignMultiple = this.conf.getAssignMultiple();
|
||||||
maxAssign = this.conf.getMaxAssign();
|
maxAssign = this.conf.getMaxAssign();
|
||||||
sizeBasedWeight = this.conf.getSizeBasedWeight();
|
sizeBasedWeight = this.conf.getSizeBasedWeight();
|
||||||
preemptionInterval = this.conf.getPreemptionInterval();
|
preemptionInterval = this.conf.getPreemptionInterval();
|
||||||
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
|
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
|
||||||
usePortForNodeName = this.conf.getUsePortForNodeName();
|
usePortForNodeName = this.conf.getUsePortForNodeName();
|
||||||
|
|
||||||
updateInterval = this.conf.getUpdateInterval();
|
updateInterval = this.conf.getUpdateInterval();
|
||||||
if (updateInterval < 0) {
|
if (updateInterval < 0) {
|
||||||
updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
|
updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
|
||||||
LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
|
LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
|
||||||
+ " is invalid, so using default value " +
|
+ " is invalid, so using default value " +
|
||||||
+ FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
|
+FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
|
||||||
+ " ms instead");
|
+ " ms instead");
|
||||||
}
|
}
|
||||||
|
|
||||||
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
|
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
|
||||||
fsOpDurations = FSOpDurations.getInstance(true);
|
fsOpDurations = FSOpDurations.getInstance(true);
|
||||||
|
|
||||||
// This stores per-application scheduling information
|
// This stores per-application scheduling information
|
||||||
this.applications = new ConcurrentHashMap<
|
this.applications = new ConcurrentHashMap<
|
||||||
ApplicationId, SchedulerApplication<FSAppAttempt>>();
|
ApplicationId, SchedulerApplication<FSAppAttempt>>();
|
||||||
this.eventLog = new FairSchedulerEventLog();
|
this.eventLog = new FairSchedulerEventLog();
|
||||||
eventLog.init(this.conf);
|
eventLog.init(this.conf);
|
||||||
|
|
||||||
allocConf = new AllocationConfiguration(conf);
|
allocConf = new AllocationConfiguration(conf);
|
||||||
try {
|
try {
|
||||||
queueMgr.initialize(conf);
|
queueMgr.initialize(conf);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IOException("Failed to start FairScheduler", e);
|
throw new IOException("Failed to start FairScheduler", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
updateThread = new UpdateThread();
|
updateThread = new UpdateThread();
|
||||||
updateThread.setName("FairSchedulerUpdateThread");
|
updateThread.setName("FairSchedulerUpdateThread");
|
||||||
updateThread.setDaemon(true);
|
updateThread.setDaemon(true);
|
||||||
|
|
||||||
if (continuousSchedulingEnabled) {
|
if (continuousSchedulingEnabled) {
|
||||||
// start continuous scheduling thread
|
// start continuous scheduling thread
|
||||||
schedulingThread = new ContinuousSchedulingThread();
|
schedulingThread = new ContinuousSchedulingThread();
|
||||||
schedulingThread.setName("FairSchedulerContinuousScheduling");
|
schedulingThread.setName("FairSchedulerContinuousScheduling");
|
||||||
schedulingThread.setDaemon(true);
|
schedulingThread.setDaemon(true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
allocsLoader.init(conf);
|
allocsLoader.init(conf);
|
||||||
|
@ -1321,7 +1323,7 @@ public class FairScheduler extends
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
|
public void reinitialize(Configuration conf, RMContext rmContext)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
allocsLoader.reloadAllocations();
|
allocsLoader.reloadAllocations();
|
||||||
|
|
Loading…
Reference in New Issue