From 94c43d250eb14fbd6c9bfa6810c1282d5da92de2 Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Sun, 18 Nov 2012 05:46:29 +0000 Subject: [PATCH] YARN-184. Remove unnecessary locking in fair scheduler, and address findbugs excludes. (sandyr via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1410830 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 51 +-- .../scheduler/fair/FairScheduler.java | 26 +- .../scheduler/fair/QueueManager.java | 330 ++++++------------ 4 files changed, 134 insertions(+), 276 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9c82619ca64..e32e30eada8 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -84,6 +84,9 @@ Release 2.0.3-alpha - Unreleased YARN-169. Update log4j.appender.EventCounter to use org.apache.hadoop.log.metrics.EventCounter (Anthony Rojas via tomwhite) + YARN-184. Remove unnecessary locking in fair scheduler, and address + findbugs excludes. (sandyr via tucu) + Release 2.0.2-alpha - 2012-09-07 YARN-9. Rename YARN_HOME to HADOOP_YARN_HOME. (vinodkv via acmurthy) diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index af616f77889..082df546305 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -236,54 +236,5 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f011802f77e..de5de41894f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -94,7 +94,7 @@ public class FairScheduler implements ResourceScheduler { protected long UPDATE_INTERVAL = 500; // Whether to use username in place of "default" queue name - private boolean userAsDefaultQueue = false; + private volatile boolean userAsDefaultQueue = false; private final static List EMPTY_CONTAINER_LIST = new ArrayList(); @@ -136,7 +136,11 @@ public class FairScheduler implements ResourceScheduler { private FairSchedulerEventLog eventLog; // Machine-readable event log protected boolean assignMultiple; // Allocate multiple containers per heartbeat protected int maxAssign; // Max containers to assign per heartbeat - + + public FairScheduler() { + clock = new SystemClock(); + queueMgr = new QueueManager(this); + } public FairSchedulerConfiguration getConf() { return conf; @@ -166,7 +170,7 @@ public class FairScheduler implements ResourceScheduler { */ private class UpdateThread implements Runnable { public void run() { - while (initialized) { + while (true) { try { Thread.sleep(UPDATE_INTERVAL); update(); @@ -256,7 +260,7 @@ public class FairScheduler implements ResourceScheduler { * If such queues exist, compute how many tasks of each type need to be * preempted and then select the right ones using preemptTasks. */ - protected void preemptTasksIfNecessary() { + protected synchronized void preemptTasksIfNecessary() { if (!preemptionEnabled) { return; } @@ -414,7 +418,8 @@ public class FairScheduler implements ResourceScheduler { return rmContext.getContainerTokenSecretManager(); } - public double getAppWeight(AppSchedulable app) { + // synchronized for sizeBasedWeight + public synchronized double getAppWeight(AppSchedulable app) { if (!app.getRunnable()) { // Job won't launch tasks, but don't return 0 to avoid division errors return 1.0; @@ -885,7 +890,6 @@ public class FairScheduler implements ResourceScheduler { this.conf = new FairSchedulerConfiguration(conf); rootMetrics = QueueMetrics.forQueue("root", null, true, conf); this.rmContext = rmContext; - this.clock = new SystemClock(); this.eventLog = new FairSchedulerEventLog(); eventLog.init(this.conf); minimumAllocation = this.conf.getMinimumMemoryAllocation(); @@ -897,21 +901,21 @@ public class FairScheduler implements ResourceScheduler { assignMultiple = this.conf.getAssignMultiple(); maxAssign = this.conf.getMaxAssign(); - Thread updateThread = new Thread(new UpdateThread()); - updateThread.start(); - initialized = true; sizeBasedWeight = this.conf.getSizeBasedWeight(); - queueMgr = new QueueManager(this); - try { queueMgr.initialize(); } catch (Exception e) { throw new IOException("Failed to start FairScheduler", e); } + + Thread updateThread = new Thread(new UpdateThread()); + updateThread.setName("FairSchedulerUpdateThread"); + updateThread.setDaemon(true); + updateThread.start(); } else { this.conf = new FairSchedulerConfiguration(conf); userAsDefaultQueue = this.conf.getUserAsDefaultQueue(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 0395eaad5c7..2da306e2dce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -24,7 +24,6 @@ import java.net.URL; import java.net.URLConnection; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -71,40 +70,6 @@ public class QueueManager { private final FairScheduler scheduler; - // Minimum resource allocation for each queue - private Map minQueueResources = new HashMap(); - // Maximum amount of resources per queue - private Map maxQueueResources = new HashMap(); - // Sharing weights for each queue - private Map queueWeights = new HashMap(); - - // Max concurrent running applications for each queue and for each user; in addition, - // for users that have no max specified, we use the userMaxJobsDefault. - private Map queueMaxApps = new HashMap(); - private Map userMaxApps = new HashMap(); - private int userMaxAppsDefault = Integer.MAX_VALUE; - private int queueMaxAppsDefault = Integer.MAX_VALUE; - - // ACL's for each queue. Only specifies non-default ACL's from configuration. - private Map> queueAcls = - new HashMap>(); - - // Min share preemption timeout for each queue in seconds. If a job in the queue - // waits this long without receiving its guaranteed share, it is allowed to - // preempt other jobs' tasks. - private Map minSharePreemptionTimeouts = - new HashMap(); - - // Default min share preemption timeout for queues where it is not set - // explicitly. - private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; - - // Preemption timeout for jobs below fair share in seconds. If a job remains - // below half its fair share for this long, it is allowed to preempt tasks. - private long fairSharePreemptionTimeout = Long.MAX_VALUE; - - SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR; - private Object allocFile; // Path to XML file containing allocations. This // is either a URL to specify a classpath resource // (if the fair-scheduler.xml on the classpath is @@ -113,40 +78,12 @@ public class QueueManager { private Map queues = new HashMap(); + private volatile QueueManagerInfo info = new QueueManagerInfo(); + private long lastReloadAttempt; // Last time we tried to reload the queues file private long lastSuccessfulReload; // Last time we successfully reloaded queues private boolean lastReloadAttemptFailed = false; - // Monitor object for minQueueResources - private Object minQueueResourcesMO = new Object(); - - //Monitor object for maxQueueResources - private Object maxQueueResourcesMO = new Object(); - - //Monitor object for queueMaxApps - private Object queueMaxAppsMO = new Object(); - - //Monitor object for userMaxApps - private Object userMaxAppsMO = new Object(); - - //Monitor object for queueWeights - private Object queueWeightsMO = new Object(); - - //Monitor object for minSharePreemptionTimeouts - private Object minSharePreemptionTimeoutsMO = new Object(); - - //Monitor object for queueAcls - private Object queueAclsMO = new Object(); - - //Monitor object for userMaxAppsDefault - private Object userMaxAppsDefaultMO = new Object(); - - //Monitor object for queueMaxAppsDefault - private Object queueMaxAppsDefaultMO = new Object(); - - //Monitor object for defaultSchedulingMode - private Object defaultSchedulingModeMO = new Object(); - public QueueManager(FairScheduler scheduler) { this.scheduler = scheduler; } @@ -180,9 +117,7 @@ public class QueueManager { FSQueue queue = queues.get(name); if (queue == null) { queue = new FSQueue(scheduler, name); - synchronized (defaultSchedulingModeMO){ - queue.setSchedulingMode(defaultSchedulingMode); - } + queue.setSchedulingMode(info.defaultSchedulingMode); queues.put(name, queue); } return queue; @@ -272,6 +207,8 @@ public class QueueManager { new HashMap>(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; + long fairSharePreemptionTimeout = Long.MAX_VALUE; + long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR; // Remember all queue names so we can display them on web UI, etc. @@ -389,16 +326,10 @@ public class QueueManager { // Commit the reload; also create any queue defined in the alloc file // if it does not already exist, so it can be displayed on the web UI. synchronized (this) { - setMinResources(minQueueResources); - setMaxResources(maxQueueResources); - setQueueMaxApps(queueMaxApps); - setUserMaxApps(userMaxApps); - setQueueWeights(queueWeights); - setUserMaxAppsDefault(userMaxAppsDefault); - setQueueMaxAppsDefault(queueMaxAppsDefault); - setDefaultSchedulingMode(defaultSchedulingMode); - setMinSharePreemptionTimeouts(minSharePreemptionTimeouts); - setQueueAcls(queueAcls); + info = new QueueManagerInfo(minQueueResources, maxQueueResources, + queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, + queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts, + queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout); for (String name: queueNamesInAllocFile) { FSQueue queue = getQueue(name); if (queueModes.containsKey(name)) { @@ -428,182 +359,87 @@ public class QueueManager { * @return the cap set on this queue, or 0 if not set. */ public Resource getMinResources(String queue) { - synchronized(minQueueResourcesMO) { - if (minQueueResources.containsKey(queue)) { - return minQueueResources.get(queue); - } else { - return Resources.createResource(0); - } + Resource minQueueResource = info.minQueueResources.get(queue); + if (minQueueResource != null) { + return minQueueResource; + } else { + return Resources.createResource(0); } } - private void setMinResources(Map resources) { - synchronized (minQueueResourcesMO) { - minQueueResources = resources; - } - } /** * Get the maximum resource allocation for the given queue. * @return the cap set on this queue, or Integer.MAX_VALUE if not set. */ + public Resource getMaxResources(String queueName) { - synchronized (maxQueueResourcesMO) { - if (maxQueueResources.containsKey(queueName)) { - return maxQueueResources.get(queueName); - } else { - return Resources.createResource(Integer.MAX_VALUE); - } + Resource maxQueueResource = info.maxQueueResources.get(queueName); + if (maxQueueResource != null) { + return maxQueueResource; + } else { + return Resources.createResource(Integer.MAX_VALUE); } } - private void setMaxResources(Map resources) { - synchronized (maxQueueResourcesMO) { - maxQueueResources = resources; - } - } - - /** - * Add an app in the appropriate queue - */ - public synchronized void addApp(FSSchedulerApp app) { - getQueue(app.getQueueName()).addApp(app); - } - - /** - * Remove an app - */ - public synchronized void removeApp(FSSchedulerApp app) { - getQueue(app.getQueueName()).removeApp(app); - } - /** * Get a collection of all queues */ public Collection getQueues() { synchronized (queues) { - return queues.values(); + return new ArrayList(queues.values()); } } - /** - * Get all queue names that have been seen either in the allocation file or in - * a submitted app. - */ - public synchronized Collection getQueueNames() { - List list = new ArrayList(); - for (FSQueue queue: getQueues()) { - list.add(queue.getName()); - } - Collections.sort(list); - return list; - } - public int getUserMaxApps(String user) { - synchronized (userMaxAppsMO) { - if (userMaxApps.containsKey(user)) { - return userMaxApps.get(user); - } else { - return getUserMaxAppsDefault(); - } + // save current info in case it gets changed under us + QueueManagerInfo info = this.info; + if (info.userMaxApps.containsKey(user)) { + return info.userMaxApps.get(user); + } else { + return info.userMaxAppsDefault; } } - private void setUserMaxApps(Map userApps) { - synchronized (userMaxAppsMO) { - userMaxApps = userApps; - } - } - - private int getUserMaxAppsDefault() { - synchronized (userMaxAppsDefaultMO){ - return userMaxAppsDefault; - } - } - - private void setUserMaxAppsDefault(int userMaxApps) { - synchronized (userMaxAppsDefaultMO){ - userMaxAppsDefault = userMaxApps; - } - } - public int getQueueMaxApps(String queue) { - synchronized (queueMaxAppsMO) { - if (queueMaxApps.containsKey(queue)) { - return queueMaxApps.get(queue); - } else { - return getQueueMaxAppsDefault(); - } + // save current info in case it gets changed under us + QueueManagerInfo info = this.info; + if (info.queueMaxApps.containsKey(queue)) { + return info.queueMaxApps.get(queue); + } else { + return info.queueMaxAppsDefault; } } - private void setQueueMaxApps(Map queueApps) { - synchronized (queueMaxAppsMO) { - queueMaxApps = queueApps; - } - } - - private int getQueueMaxAppsDefault(){ - synchronized (queueMaxAppsDefaultMO) { - return queueMaxAppsDefault; - } - } - - private void setQueueMaxAppsDefault(int queueMaxApps){ - synchronized(queueMaxAppsDefaultMO) { - queueMaxAppsDefault = queueMaxApps; - } - } - - private void setDefaultSchedulingMode(SchedulingMode schedulingMode){ - synchronized(defaultSchedulingModeMO) { - defaultSchedulingMode = schedulingMode; - } - } - public double getQueueWeight(String queue) { - synchronized (queueWeightsMO) { - if (queueWeights.containsKey(queue)) { - return queueWeights.get(queue); - } else { - return 1.0; - } + Double weight = info.queueWeights.get(queue); + if (weight != null) { + return weight; + } else { + return 1.0; } } - private void setQueueWeights(Map weights) { - synchronized (queueWeightsMO) { - queueWeights = weights; - } - } - /** * Get a queue's min share preemption timeout, in milliseconds. This is the * time after which jobs in the queue may kill other queues' tasks if they * are below their min share. */ public long getMinSharePreemptionTimeout(String queueName) { - synchronized (minSharePreemptionTimeoutsMO) { - if (minSharePreemptionTimeouts.containsKey(queueName)) { - return minSharePreemptionTimeouts.get(queueName); - } + // save current info in case it gets changed under us + QueueManagerInfo info = this.info; + if (info.minSharePreemptionTimeouts.containsKey(queueName)) { + return info.minSharePreemptionTimeouts.get(queueName); } - return defaultMinSharePreemptionTimeout; + return info.defaultMinSharePreemptionTimeout; } - private void setMinSharePreemptionTimeouts( - Map sharePreemptionTimeouts){ - synchronized (minSharePreemptionTimeoutsMO) { - minSharePreemptionTimeouts = sharePreemptionTimeouts; - } - } - /** * Get the fair share preemption, in milliseconds. This is the time * after which any job may kill other jobs' tasks if it is below half * its fair share. */ public long getFairSharePreemptionTimeout() { - return fairSharePreemptionTimeout; + return info.fairSharePreemptionTimeout; } /** @@ -612,10 +448,9 @@ public class QueueManager { */ public Map getQueueAcls(String queue) { HashMap out = new HashMap(); - synchronized (queueAclsMO) { - if (queueAcls.containsKey(queue)) { - out.putAll(queueAcls.get(queue)); - } + Map queueAcl = info.queueAcls.get(queue); + if (queueAcl != null) { + out.putAll(queueAcl); } if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) { out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*")); @@ -626,9 +461,74 @@ public class QueueManager { return out; } - private void setQueueAcls(Map> queue) { - synchronized (queueAclsMO) { - queueAcls = queue; + static class QueueManagerInfo { + // Minimum resource allocation for each queue + public final Map minQueueResources; + // Maximum amount of resources per queue + public final Map maxQueueResources; + // Sharing weights for each queue + public final Map queueWeights; + + // Max concurrent running applications for each queue and for each user; in addition, + // for users that have no max specified, we use the userMaxJobsDefault. + public final Map queueMaxApps; + public final Map userMaxApps; + public final int userMaxAppsDefault; + public final int queueMaxAppsDefault; + + // ACL's for each queue. Only specifies non-default ACL's from configuration. + public final Map> queueAcls; + + // Min share preemption timeout for each queue in seconds. If a job in the queue + // waits this long without receiving its guaranteed share, it is allowed to + // preempt other jobs' tasks. + public final Map minSharePreemptionTimeouts; + + // Default min share preemption timeout for queues where it is not set + // explicitly. + public final long defaultMinSharePreemptionTimeout; + + // Preemption timeout for jobs below fair share in seconds. If a job remains + // below half its fair share for this long, it is allowed to preempt tasks. + public final long fairSharePreemptionTimeout; + + public final SchedulingMode defaultSchedulingMode; + + public QueueManagerInfo(Map minQueueResources, + Map maxQueueResources, + Map queueMaxApps, Map userMaxApps, + Map queueWeights, int userMaxAppsDefault, + int queueMaxAppsDefault, SchedulingMode defaultSchedulingMode, + Map minSharePreemptionTimeouts, + Map> queueAcls, + long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) { + this.minQueueResources = minQueueResources; + this.maxQueueResources = maxQueueResources; + this.queueMaxApps = queueMaxApps; + this.userMaxApps = userMaxApps; + this.queueWeights = queueWeights; + this.userMaxAppsDefault = userMaxAppsDefault; + this.queueMaxAppsDefault = queueMaxAppsDefault; + this.defaultSchedulingMode = defaultSchedulingMode; + this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; + this.queueAcls = queueAcls; + this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; + this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout; + } + + public QueueManagerInfo() { + minQueueResources = new HashMap(); + maxQueueResources = new HashMap(); + queueWeights = new HashMap(); + queueMaxApps = new HashMap(); + userMaxApps = new HashMap(); + userMaxAppsDefault = Integer.MAX_VALUE; + queueMaxAppsDefault = Integer.MAX_VALUE; + queueAcls = new HashMap>(); + minSharePreemptionTimeouts = new HashMap(); + defaultMinSharePreemptionTimeout = Long.MAX_VALUE; + fairSharePreemptionTimeout = Long.MAX_VALUE; + defaultSchedulingMode = SchedulingMode.FAIR; } } }