diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 9c82619ca64..e32e30eada8 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -84,6 +84,9 @@ Release 2.0.3-alpha - Unreleased
YARN-169. Update log4j.appender.EventCounter to use
org.apache.hadoop.log.metrics.EventCounter (Anthony Rojas via tomwhite)
+ YARN-184. Remove unnecessary locking in fair scheduler, and address
+ findbugs excludes. (sandyr via tucu)
+
Release 2.0.2-alpha - 2012-09-07
YARN-9. Rename YARN_HOME to HADOOP_YARN_HOME. (vinodkv via acmurthy)
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index af616f77889..082df546305 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -236,54 +236,5 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index f011802f77e..de5de41894f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -94,7 +94,7 @@ public class FairScheduler implements ResourceScheduler {
protected long UPDATE_INTERVAL = 500;
// Whether to use username in place of "default" queue name
- private boolean userAsDefaultQueue = false;
+ private volatile boolean userAsDefaultQueue = false;
private final static List EMPTY_CONTAINER_LIST =
new ArrayList();
@@ -136,7 +136,11 @@ public class FairScheduler implements ResourceScheduler {
private FairSchedulerEventLog eventLog; // Machine-readable event log
protected boolean assignMultiple; // Allocate multiple containers per heartbeat
protected int maxAssign; // Max containers to assign per heartbeat
-
+
+ public FairScheduler() {
+ clock = new SystemClock();
+ queueMgr = new QueueManager(this);
+ }
public FairSchedulerConfiguration getConf() {
return conf;
@@ -166,7 +170,7 @@ private RMContainer getRMContainer(ContainerId containerId) {
*/
private class UpdateThread implements Runnable {
public void run() {
- while (initialized) {
+ while (true) {
try {
Thread.sleep(UPDATE_INTERVAL);
update();
@@ -256,7 +260,7 @@ boolean isStarvedForFairShare(FSQueueSchedulable sched) {
* If such queues exist, compute how many tasks of each type need to be
* preempted and then select the right ones using preemptTasks.
*/
- protected void preemptTasksIfNecessary() {
+ protected synchronized void preemptTasksIfNecessary() {
if (!preemptionEnabled) {
return;
}
@@ -414,7 +418,8 @@ public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
}
- public double getAppWeight(AppSchedulable app) {
+ // synchronized for sizeBasedWeight
+ public synchronized double getAppWeight(AppSchedulable app) {
if (!app.getRunnable()) {
// Job won't launch tasks, but don't return 0 to avoid division errors
return 1.0;
@@ -885,7 +890,6 @@ public void recover(RMState state) throws Exception {
this.conf = new FairSchedulerConfiguration(conf);
rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext;
- this.clock = new SystemClock();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
minimumAllocation = this.conf.getMinimumMemoryAllocation();
@@ -897,21 +901,21 @@ public void recover(RMState state) throws Exception {
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
- Thread updateThread = new Thread(new UpdateThread());
- updateThread.start();
-
initialized = true;
sizeBasedWeight = this.conf.getSizeBasedWeight();
- queueMgr = new QueueManager(this);
-
try {
queueMgr.initialize();
}
catch (Exception e) {
throw new IOException("Failed to start FairScheduler", e);
}
+
+ Thread updateThread = new Thread(new UpdateThread());
+ updateThread.setName("FairSchedulerUpdateThread");
+ updateThread.setDaemon(true);
+ updateThread.start();
} else {
this.conf = new FairSchedulerConfiguration(conf);
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index 0395eaad5c7..2da306e2dce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -24,7 +24,6 @@
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -71,40 +70,6 @@ public class QueueManager {
private final FairScheduler scheduler;
- // Minimum resource allocation for each queue
- private Map minQueueResources = new HashMap();
- // Maximum amount of resources per queue
- private Map maxQueueResources = new HashMap();
- // Sharing weights for each queue
- private Map queueWeights = new HashMap();
-
- // Max concurrent running applications for each queue and for each user; in addition,
- // for users that have no max specified, we use the userMaxJobsDefault.
- private Map queueMaxApps = new HashMap();
- private Map userMaxApps = new HashMap();
- private int userMaxAppsDefault = Integer.MAX_VALUE;
- private int queueMaxAppsDefault = Integer.MAX_VALUE;
-
- // ACL's for each queue. Only specifies non-default ACL's from configuration.
- private Map> queueAcls =
- new HashMap>();
-
- // Min share preemption timeout for each queue in seconds. If a job in the queue
- // waits this long without receiving its guaranteed share, it is allowed to
- // preempt other jobs' tasks.
- private Map minSharePreemptionTimeouts =
- new HashMap();
-
- // Default min share preemption timeout for queues where it is not set
- // explicitly.
- private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
-
- // Preemption timeout for jobs below fair share in seconds. If a job remains
- // below half its fair share for this long, it is allowed to preempt tasks.
- private long fairSharePreemptionTimeout = Long.MAX_VALUE;
-
- SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
-
private Object allocFile; // Path to XML file containing allocations. This
// is either a URL to specify a classpath resource
// (if the fair-scheduler.xml on the classpath is
@@ -113,40 +78,12 @@ public class QueueManager {
private Map queues = new HashMap();
+ private volatile QueueManagerInfo info = new QueueManagerInfo();
+
private long lastReloadAttempt; // Last time we tried to reload the queues file
private long lastSuccessfulReload; // Last time we successfully reloaded queues
private boolean lastReloadAttemptFailed = false;
- // Monitor object for minQueueResources
- private Object minQueueResourcesMO = new Object();
-
- //Monitor object for maxQueueResources
- private Object maxQueueResourcesMO = new Object();
-
- //Monitor object for queueMaxApps
- private Object queueMaxAppsMO = new Object();
-
- //Monitor object for userMaxApps
- private Object userMaxAppsMO = new Object();
-
- //Monitor object for queueWeights
- private Object queueWeightsMO = new Object();
-
- //Monitor object for minSharePreemptionTimeouts
- private Object minSharePreemptionTimeoutsMO = new Object();
-
- //Monitor object for queueAcls
- private Object queueAclsMO = new Object();
-
- //Monitor object for userMaxAppsDefault
- private Object userMaxAppsDefaultMO = new Object();
-
- //Monitor object for queueMaxAppsDefault
- private Object queueMaxAppsDefaultMO = new Object();
-
- //Monitor object for defaultSchedulingMode
- private Object defaultSchedulingModeMO = new Object();
-
public QueueManager(FairScheduler scheduler) {
this.scheduler = scheduler;
}
@@ -180,9 +117,7 @@ public FSQueue getQueue(String name) {
FSQueue queue = queues.get(name);
if (queue == null) {
queue = new FSQueue(scheduler, name);
- synchronized (defaultSchedulingModeMO){
- queue.setSchedulingMode(defaultSchedulingMode);
- }
+ queue.setSchedulingMode(info.defaultSchedulingMode);
queues.put(name, queue);
}
return queue;
@@ -272,6 +207,8 @@ public void reloadAllocs() throws IOException, ParserConfigurationException,
new HashMap>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
+ long fairSharePreemptionTimeout = Long.MAX_VALUE;
+ long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
// Remember all queue names so we can display them on web UI, etc.
@@ -389,16 +326,10 @@ else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.
synchronized (this) {
- setMinResources(minQueueResources);
- setMaxResources(maxQueueResources);
- setQueueMaxApps(queueMaxApps);
- setUserMaxApps(userMaxApps);
- setQueueWeights(queueWeights);
- setUserMaxAppsDefault(userMaxAppsDefault);
- setQueueMaxAppsDefault(queueMaxAppsDefault);
- setDefaultSchedulingMode(defaultSchedulingMode);
- setMinSharePreemptionTimeouts(minSharePreemptionTimeouts);
- setQueueAcls(queueAcls);
+ info = new QueueManagerInfo(minQueueResources, maxQueueResources,
+ queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
+ queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
+ queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
for (String name: queueNamesInAllocFile) {
FSQueue queue = getQueue(name);
if (queueModes.containsKey(name)) {
@@ -428,182 +359,87 @@ private SchedulingMode parseSchedulingMode(String text)
* @return the cap set on this queue, or 0 if not set.
*/
public Resource getMinResources(String queue) {
- synchronized(minQueueResourcesMO) {
- if (minQueueResources.containsKey(queue)) {
- return minQueueResources.get(queue);
- } else {
- return Resources.createResource(0);
- }
+ Resource minQueueResource = info.minQueueResources.get(queue);
+ if (minQueueResource != null) {
+ return minQueueResource;
+ } else {
+ return Resources.createResource(0);
}
}
- private void setMinResources(Map resources) {
- synchronized (minQueueResourcesMO) {
- minQueueResources = resources;
- }
- }
/**
* Get the maximum resource allocation for the given queue.
* @return the cap set on this queue, or Integer.MAX_VALUE if not set.
*/
+
public Resource getMaxResources(String queueName) {
- synchronized (maxQueueResourcesMO) {
- if (maxQueueResources.containsKey(queueName)) {
- return maxQueueResources.get(queueName);
- } else {
- return Resources.createResource(Integer.MAX_VALUE);
- }
+ Resource maxQueueResource = info.maxQueueResources.get(queueName);
+ if (maxQueueResource != null) {
+ return maxQueueResource;
+ } else {
+ return Resources.createResource(Integer.MAX_VALUE);
}
}
- private void setMaxResources(Map resources) {
- synchronized (maxQueueResourcesMO) {
- maxQueueResources = resources;
- }
- }
-
- /**
- * Add an app in the appropriate queue
- */
- public synchronized void addApp(FSSchedulerApp app) {
- getQueue(app.getQueueName()).addApp(app);
- }
-
- /**
- * Remove an app
- */
- public synchronized void removeApp(FSSchedulerApp app) {
- getQueue(app.getQueueName()).removeApp(app);
- }
-
/**
* Get a collection of all queues
*/
public Collection getQueues() {
synchronized (queues) {
- return queues.values();
+ return new ArrayList(queues.values());
}
}
- /**
- * Get all queue names that have been seen either in the allocation file or in
- * a submitted app.
- */
- public synchronized Collection getQueueNames() {
- List list = new ArrayList();
- for (FSQueue queue: getQueues()) {
- list.add(queue.getName());
- }
- Collections.sort(list);
- return list;
- }
-
public int getUserMaxApps(String user) {
- synchronized (userMaxAppsMO) {
- if (userMaxApps.containsKey(user)) {
- return userMaxApps.get(user);
- } else {
- return getUserMaxAppsDefault();
- }
+ // save current info in case it gets changed under us
+ QueueManagerInfo info = this.info;
+ if (info.userMaxApps.containsKey(user)) {
+ return info.userMaxApps.get(user);
+ } else {
+ return info.userMaxAppsDefault;
}
}
- private void setUserMaxApps(Map userApps) {
- synchronized (userMaxAppsMO) {
- userMaxApps = userApps;
- }
- }
-
- private int getUserMaxAppsDefault() {
- synchronized (userMaxAppsDefaultMO){
- return userMaxAppsDefault;
- }
- }
-
- private void setUserMaxAppsDefault(int userMaxApps) {
- synchronized (userMaxAppsDefaultMO){
- userMaxAppsDefault = userMaxApps;
- }
- }
-
public int getQueueMaxApps(String queue) {
- synchronized (queueMaxAppsMO) {
- if (queueMaxApps.containsKey(queue)) {
- return queueMaxApps.get(queue);
- } else {
- return getQueueMaxAppsDefault();
- }
+ // save current info in case it gets changed under us
+ QueueManagerInfo info = this.info;
+ if (info.queueMaxApps.containsKey(queue)) {
+ return info.queueMaxApps.get(queue);
+ } else {
+ return info.queueMaxAppsDefault;
}
}
- private void setQueueMaxApps(Map queueApps) {
- synchronized (queueMaxAppsMO) {
- queueMaxApps = queueApps;
- }
- }
-
- private int getQueueMaxAppsDefault(){
- synchronized (queueMaxAppsDefaultMO) {
- return queueMaxAppsDefault;
- }
- }
-
- private void setQueueMaxAppsDefault(int queueMaxApps){
- synchronized(queueMaxAppsDefaultMO) {
- queueMaxAppsDefault = queueMaxApps;
- }
- }
-
- private void setDefaultSchedulingMode(SchedulingMode schedulingMode){
- synchronized(defaultSchedulingModeMO) {
- defaultSchedulingMode = schedulingMode;
- }
- }
-
public double getQueueWeight(String queue) {
- synchronized (queueWeightsMO) {
- if (queueWeights.containsKey(queue)) {
- return queueWeights.get(queue);
- } else {
- return 1.0;
- }
+ Double weight = info.queueWeights.get(queue);
+ if (weight != null) {
+ return weight;
+ } else {
+ return 1.0;
}
}
- private void setQueueWeights(Map weights) {
- synchronized (queueWeightsMO) {
- queueWeights = weights;
- }
- }
-
/**
* Get a queue's min share preemption timeout, in milliseconds. This is the
* time after which jobs in the queue may kill other queues' tasks if they
* are below their min share.
*/
public long getMinSharePreemptionTimeout(String queueName) {
- synchronized (minSharePreemptionTimeoutsMO) {
- if (minSharePreemptionTimeouts.containsKey(queueName)) {
- return minSharePreemptionTimeouts.get(queueName);
- }
+ // save current info in case it gets changed under us
+ QueueManagerInfo info = this.info;
+ if (info.minSharePreemptionTimeouts.containsKey(queueName)) {
+ return info.minSharePreemptionTimeouts.get(queueName);
}
- return defaultMinSharePreemptionTimeout;
+ return info.defaultMinSharePreemptionTimeout;
}
- private void setMinSharePreemptionTimeouts(
- Map sharePreemptionTimeouts){
- synchronized (minSharePreemptionTimeoutsMO) {
- minSharePreemptionTimeouts = sharePreemptionTimeouts;
- }
- }
-
/**
* Get the fair share preemption, in milliseconds. This is the time
* after which any job may kill other jobs' tasks if it is below half
* its fair share.
*/
public long getFairSharePreemptionTimeout() {
- return fairSharePreemptionTimeout;
+ return info.fairSharePreemptionTimeout;
}
/**
@@ -612,10 +448,9 @@ public long getFairSharePreemptionTimeout() {
*/
public Map getQueueAcls(String queue) {
HashMap out = new HashMap();
- synchronized (queueAclsMO) {
- if (queueAcls.containsKey(queue)) {
- out.putAll(queueAcls.get(queue));
- }
+ Map queueAcl = info.queueAcls.get(queue);
+ if (queueAcl != null) {
+ out.putAll(queueAcl);
}
if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) {
out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*"));
@@ -626,9 +461,74 @@ public Map getQueueAcls(String queue) {
return out;
}
- private void setQueueAcls(Map> queue) {
- synchronized (queueAclsMO) {
- queueAcls = queue;
+ static class QueueManagerInfo {
+ // Minimum resource allocation for each queue
+ public final Map minQueueResources;
+ // Maximum amount of resources per queue
+ public final Map maxQueueResources;
+ // Sharing weights for each queue
+ public final Map queueWeights;
+
+ // Max concurrent running applications for each queue and for each user; in addition,
+ // for users that have no max specified, we use the userMaxJobsDefault.
+ public final Map queueMaxApps;
+ public final Map userMaxApps;
+ public final int userMaxAppsDefault;
+ public final int queueMaxAppsDefault;
+
+ // ACL's for each queue. Only specifies non-default ACL's from configuration.
+ public final Map> queueAcls;
+
+ // Min share preemption timeout for each queue in seconds. If a job in the queue
+ // waits this long without receiving its guaranteed share, it is allowed to
+ // preempt other jobs' tasks.
+ public final Map minSharePreemptionTimeouts;
+
+ // Default min share preemption timeout for queues where it is not set
+ // explicitly.
+ public final long defaultMinSharePreemptionTimeout;
+
+ // Preemption timeout for jobs below fair share in seconds. If a job remains
+ // below half its fair share for this long, it is allowed to preempt tasks.
+ public final long fairSharePreemptionTimeout;
+
+ public final SchedulingMode defaultSchedulingMode;
+
+ public QueueManagerInfo(Map minQueueResources,
+ Map maxQueueResources,
+ Map queueMaxApps, Map userMaxApps,
+ Map queueWeights, int userMaxAppsDefault,
+ int queueMaxAppsDefault, SchedulingMode defaultSchedulingMode,
+ Map minSharePreemptionTimeouts,
+ Map> queueAcls,
+ long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) {
+ this.minQueueResources = minQueueResources;
+ this.maxQueueResources = maxQueueResources;
+ this.queueMaxApps = queueMaxApps;
+ this.userMaxApps = userMaxApps;
+ this.queueWeights = queueWeights;
+ this.userMaxAppsDefault = userMaxAppsDefault;
+ this.queueMaxAppsDefault = queueMaxAppsDefault;
+ this.defaultSchedulingMode = defaultSchedulingMode;
+ this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
+ this.queueAcls = queueAcls;
+ this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
+ this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
+ }
+
+ public QueueManagerInfo() {
+ minQueueResources = new HashMap();
+ maxQueueResources = new HashMap();
+ queueWeights = new HashMap();
+ queueMaxApps = new HashMap();
+ userMaxApps = new HashMap();
+ userMaxAppsDefault = Integer.MAX_VALUE;
+ queueMaxAppsDefault = Integer.MAX_VALUE;
+ queueAcls = new HashMap>();
+ minSharePreemptionTimeouts = new HashMap();
+ defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+ fairSharePreemptionTimeout = Long.MAX_VALUE;
+ defaultSchedulingMode = SchedulingMode.FAIR;
}
}
}