Merge -c 1370889 from trunk to branch-2 to fix YARN-12. Fix findbugs warnings in FairScheduler. Contributed by Junping Du.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1370890 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
221e491f5d
commit
9337abd0b0
|
@ -32,3 +32,5 @@ Release 2.1.0-alpha - Unreleased
|
|||
|
||||
BUG FIXES
|
||||
|
||||
YARN-12. Fix findbugs warnings in FairScheduler. (Junping Du via acmurthy)
|
||||
|
||||
|
|
|
@ -129,15 +129,17 @@ class FairSchedulerEventLog {
|
|||
/**
|
||||
* Flush and close the log.
|
||||
*/
|
||||
void shutdown() {
|
||||
synchronized void shutdown() {
|
||||
try {
|
||||
if (appender != null)
|
||||
appender.close();
|
||||
} catch (Exception e) {}
|
||||
logDisabled = true;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to close fair scheduler event log", e);
|
||||
logDisabled = true;
|
||||
}
|
||||
}
|
||||
|
||||
boolean isEnabled() {
|
||||
synchronized boolean isEnabled() {
|
||||
return !logDisabled;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,6 +117,36 @@ public class QueueManager {
|
|||
private long lastSuccessfulReload; // Last time we successfully reloaded queues
|
||||
private boolean lastReloadAttemptFailed = false;
|
||||
|
||||
// Monitor object for minQueueResources
|
||||
private Object minQueueResourcesMO = new Object();
|
||||
|
||||
//Monitor object for maxQueueResources
|
||||
private Object maxQueueResourcesMO = new Object();
|
||||
|
||||
//Monitor object for queueMaxApps
|
||||
private Object queueMaxAppsMO = new Object();
|
||||
|
||||
//Monitor object for userMaxApps
|
||||
private Object userMaxAppsMO = new Object();
|
||||
|
||||
//Monitor object for queueWeights
|
||||
private Object queueWeightsMO = new Object();
|
||||
|
||||
//Monitor object for minSharePreemptionTimeouts
|
||||
private Object minSharePreemptionTimeoutsMO = new Object();
|
||||
|
||||
//Monitor object for queueAcls
|
||||
private Object queueAclsMO = new Object();
|
||||
|
||||
//Monitor object for userMaxAppsDefault
|
||||
private Object userMaxAppsDefaultMO = new Object();
|
||||
|
||||
//Monitor object for queueMaxAppsDefault
|
||||
private Object queueMaxAppsDefaultMO = new Object();
|
||||
|
||||
//Monitor object for defaultSchedulingMode
|
||||
private Object defaultSchedulingModeMO = new Object();
|
||||
|
||||
public QueueManager(FairScheduler scheduler) {
|
||||
this.scheduler = scheduler;
|
||||
}
|
||||
|
@ -145,21 +175,27 @@ public class QueueManager {
|
|||
/**
|
||||
* Get a queue by name, creating it if necessary
|
||||
*/
|
||||
public synchronized FSQueue getQueue(String name) {
|
||||
FSQueue queue = queues.get(name);
|
||||
if (queue == null) {
|
||||
queue = new FSQueue(scheduler, name);
|
||||
queue.setSchedulingMode(defaultSchedulingMode);
|
||||
queues.put(name, queue);
|
||||
public FSQueue getQueue(String name) {
|
||||
synchronized (queues) {
|
||||
FSQueue queue = queues.get(name);
|
||||
if (queue == null) {
|
||||
queue = new FSQueue(scheduler, name);
|
||||
synchronized (defaultSchedulingModeMO){
|
||||
queue.setSchedulingMode(defaultSchedulingMode);
|
||||
}
|
||||
queues.put(name, queue);
|
||||
}
|
||||
return queue;
|
||||
}
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return whether a queue exists already.
|
||||
*/
|
||||
public synchronized boolean exists(String name) {
|
||||
return queues.containsKey(name);
|
||||
public boolean exists(String name) {
|
||||
synchronized (queues) {
|
||||
return queues.containsKey(name);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -353,16 +389,16 @@ public class QueueManager {
|
|||
// Commit the reload; also create any queue defined in the alloc file
|
||||
// if it does not already exist, so it can be displayed on the web UI.
|
||||
synchronized(this) {
|
||||
this.minQueueResources = minQueueResources;
|
||||
this.maxQueueResources = maxQueueResources;
|
||||
this.queueMaxApps = queueMaxApps;
|
||||
this.userMaxApps = userMaxApps;
|
||||
this.queueWeights = queueWeights;
|
||||
this.userMaxAppsDefault = userMaxAppsDefault;
|
||||
this.queueMaxAppsDefault = queueMaxAppsDefault;
|
||||
this.defaultSchedulingMode = defaultSchedulingMode;
|
||||
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
|
||||
this.queueAcls = queueAcls;
|
||||
setMinResources(minQueueResources);
|
||||
setMaxResources(maxQueueResources);
|
||||
setQueueMaxApps(queueMaxApps);
|
||||
setUserMaxApps(userMaxApps);
|
||||
setQueueWeights(queueWeights);
|
||||
setUserMaxAppsDefault(userMaxAppsDefault);
|
||||
setQueueMaxAppsDefault(queueMaxAppsDefault);
|
||||
setDefaultSchedulingMode(defaultSchedulingMode);
|
||||
setMinSharePreemptionTimeouts(minSharePreemptionTimeouts);
|
||||
setQueueAcls(queueAcls);
|
||||
for (String name: queueNamesInAllocFile) {
|
||||
FSQueue queue = getQueue(name);
|
||||
if (queueModes.containsKey(name)) {
|
||||
|
@ -392,22 +428,37 @@ public class QueueManager {
|
|||
* @return the cap set on this queue, or 0 if not set.
|
||||
*/
|
||||
public Resource getMinResources(String queue) {
|
||||
if (minQueueResources.containsKey(queue)) {
|
||||
return minQueueResources.get(queue);
|
||||
} else{
|
||||
return Resources.createResource(0);
|
||||
synchronized(minQueueResourcesMO) {
|
||||
if (minQueueResources.containsKey(queue)) {
|
||||
return minQueueResources.get(queue);
|
||||
} else{
|
||||
return Resources.createResource(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setMinResources(Map<String, Resource> resources) {
|
||||
synchronized(minQueueResourcesMO) {
|
||||
minQueueResources = resources;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Get the maximum resource allocation for the given queue.
|
||||
* @return the cap set on this queue, or Integer.MAX_VALUE if not set.
|
||||
*/
|
||||
Resource getMaxResources(String queueName) {
|
||||
if (maxQueueResources.containsKey(queueName)) {
|
||||
return maxQueueResources.get(queueName);
|
||||
} else {
|
||||
return Resources.createResource(Integer.MAX_VALUE);
|
||||
synchronized (maxQueueResourcesMO) {
|
||||
if (maxQueueResources.containsKey(queueName)) {
|
||||
return maxQueueResources.get(queueName);
|
||||
} else {
|
||||
return Resources.createResource(Integer.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setMaxResources(Map<String, Resource> resources) {
|
||||
synchronized(maxQueueResourcesMO) {
|
||||
maxQueueResources = resources;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -428,11 +479,12 @@ public class QueueManager {
|
|||
/**
|
||||
* Get a collection of all queues
|
||||
*/
|
||||
public synchronized Collection<FSQueue> getQueues() {
|
||||
return queues.values();
|
||||
public Collection<FSQueue> getQueues() {
|
||||
synchronized (queues) {
|
||||
return queues.values();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get all queue names that have been seen either in the allocation file or in
|
||||
* a submitted app.
|
||||
|
@ -447,41 +499,103 @@ public class QueueManager {
|
|||
}
|
||||
|
||||
public int getUserMaxApps(String user) {
|
||||
if (userMaxApps.containsKey(user)) {
|
||||
return userMaxApps.get(user);
|
||||
} else {
|
||||
synchronized (userMaxAppsMO) {
|
||||
if (userMaxApps.containsKey(user)) {
|
||||
return userMaxApps.get(user);
|
||||
} else {
|
||||
return getUserMaxAppsDefault();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setUserMaxApps(Map<String, Integer> userApps) {
|
||||
synchronized (userMaxAppsMO) {
|
||||
userMaxApps = userApps;
|
||||
}
|
||||
}
|
||||
|
||||
private int getUserMaxAppsDefault() {
|
||||
synchronized (userMaxAppsDefaultMO){
|
||||
return userMaxAppsDefault;
|
||||
}
|
||||
}
|
||||
|
||||
private void setUserMaxAppsDefault(int userMaxApps) {
|
||||
synchronized (userMaxAppsDefaultMO){
|
||||
userMaxAppsDefault = userMaxApps;
|
||||
}
|
||||
}
|
||||
|
||||
public int getQueueMaxApps(String queue) {
|
||||
if (queueMaxApps.containsKey(queue)) {
|
||||
return queueMaxApps.get(queue);
|
||||
} else {
|
||||
synchronized (queueMaxAppsMO) {
|
||||
if (queueMaxApps.containsKey(queue)) {
|
||||
return queueMaxApps.get(queue);
|
||||
} else {
|
||||
return getQueueMaxAppsDefault();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setQueueMaxApps(Map<String, Integer> queueApps) {
|
||||
synchronized (queueMaxAppsMO) {
|
||||
queueMaxApps = queueApps;
|
||||
}
|
||||
}
|
||||
|
||||
private int getQueueMaxAppsDefault(){
|
||||
synchronized(queueMaxAppsDefaultMO) {
|
||||
return queueMaxAppsDefault;
|
||||
}
|
||||
}
|
||||
|
||||
public double getQueueWeight(String queue) {
|
||||
if (queueWeights.containsKey(queue)) {
|
||||
return queueWeights.get(queue);
|
||||
} else {
|
||||
return 1.0;
|
||||
private void setQueueMaxAppsDefault(int queueMaxApps){
|
||||
synchronized(queueMaxAppsDefaultMO) {
|
||||
queueMaxAppsDefault = queueMaxApps;
|
||||
}
|
||||
}
|
||||
|
||||
private void setDefaultSchedulingMode(SchedulingMode schedulingMode){
|
||||
synchronized(defaultSchedulingModeMO) {
|
||||
defaultSchedulingMode = schedulingMode;
|
||||
}
|
||||
}
|
||||
|
||||
public double getQueueWeight(String queue) {
|
||||
synchronized (queueWeightsMO) {
|
||||
if (queueWeights.containsKey(queue)) {
|
||||
return queueWeights.get(queue);
|
||||
} else {
|
||||
return 1.0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setQueueWeights(Map<String, Double> weights) {
|
||||
synchronized (queueWeightsMO) {
|
||||
queueWeights = weights;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Get a queue's min share preemption timeout, in milliseconds. This is the
|
||||
* time after which jobs in the queue may kill other queues' tasks if they
|
||||
* are below their min share.
|
||||
*/
|
||||
public long getMinSharePreemptionTimeout(String queueName) {
|
||||
if (minSharePreemptionTimeouts.containsKey(queueName)) {
|
||||
return minSharePreemptionTimeouts.get(queueName);
|
||||
synchronized (minSharePreemptionTimeoutsMO) {
|
||||
if (minSharePreemptionTimeouts.containsKey(queueName)) {
|
||||
return minSharePreemptionTimeouts.get(queueName);
|
||||
}
|
||||
}
|
||||
return defaultMinSharePreemptionTimeout;
|
||||
}
|
||||
|
||||
private void setMinSharePreemptionTimeouts(
|
||||
Map<String, Long> sharePreemptionTimeouts){
|
||||
synchronized (minSharePreemptionTimeoutsMO) {
|
||||
minSharePreemptionTimeouts = sharePreemptionTimeouts;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the fair share preemption, in milliseconds. This is the time
|
||||
* after which any job may kill other jobs' tasks if it is below half
|
||||
|
@ -497,9 +611,10 @@ public class QueueManager {
|
|||
*/
|
||||
public Map<QueueACL, AccessControlList> getQueueAcls(String queue) {
|
||||
HashMap<QueueACL, AccessControlList> out = new HashMap<QueueACL, AccessControlList>();
|
||||
|
||||
if (queueAcls.containsKey(queue)) {
|
||||
out.putAll(queueAcls.get(queue));
|
||||
synchronized (queueAclsMO) {
|
||||
if (queueAcls.containsKey(queue)) {
|
||||
out.putAll(queueAcls.get(queue));
|
||||
}
|
||||
}
|
||||
if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) {
|
||||
out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*"));
|
||||
|
@ -509,4 +624,10 @@ public class QueueManager {
|
|||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
private void setQueueAcls(Map<String, Map<QueueACL, AccessControlList>> queue) {
|
||||
synchronized (queueAclsMO) {
|
||||
queueAcls = queue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue