YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha)
(cherry picked from commit edb9cd0f7a
)
This commit is contained in:
parent
5ecc647ae0
commit
62d51b889a
|
@ -434,6 +434,8 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-3751. Fixed AppInfo to check if used resources are null. (Sunil G via
|
||||
zjshen)
|
||||
|
||||
YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha)
|
||||
|
||||
Release 2.7.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -23,6 +23,9 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -44,36 +47,64 @@ public class FSParentQueue extends FSQueue {
|
|||
private static final Log LOG = LogFactory.getLog(
|
||||
FSParentQueue.class.getName());
|
||||
|
||||
private final List<FSQueue> childQueues =
|
||||
new ArrayList<FSQueue>();
|
||||
private final List<FSQueue> childQueues = new ArrayList<>();
|
||||
private Resource demand = Resources.createResource(0);
|
||||
private int runnableApps;
|
||||
|
||||
|
||||
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
|
||||
private Lock readLock = rwLock.readLock();
|
||||
private Lock writeLock = rwLock.writeLock();
|
||||
|
||||
public FSParentQueue(String name, FairScheduler scheduler,
|
||||
FSParentQueue parent) {
|
||||
super(name, scheduler, parent);
|
||||
}
|
||||
|
||||
public void addChildQueue(FSQueue child) {
|
||||
childQueues.add(child);
|
||||
writeLock.lock();
|
||||
try {
|
||||
childQueues.add(child);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void removeChildQueue(FSQueue child) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
childQueues.remove(child);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recomputeShares() {
|
||||
policy.computeShares(childQueues, getFairShare());
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.getMetrics().setFairShare(childQueue.getFairShare());
|
||||
childQueue.recomputeShares();
|
||||
readLock.lock();
|
||||
try {
|
||||
policy.computeShares(childQueues, getFairShare());
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.getMetrics().setFairShare(childQueue.getFairShare());
|
||||
childQueue.recomputeShares();
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void recomputeSteadyShares() {
|
||||
policy.computeSteadyShares(childQueues, getSteadyFairShare());
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare());
|
||||
if (childQueue instanceof FSParentQueue) {
|
||||
((FSParentQueue) childQueue).recomputeSteadyShares();
|
||||
readLock.lock();
|
||||
try {
|
||||
policy.computeSteadyShares(childQueues, getSteadyFairShare());
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.getMetrics()
|
||||
.setSteadyFairShare(childQueue.getSteadyFairShare());
|
||||
if (childQueue instanceof FSParentQueue) {
|
||||
((FSParentQueue) childQueue).recomputeSteadyShares();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -81,21 +112,37 @@ public class FSParentQueue extends FSQueue {
|
|||
public void updatePreemptionVariables() {
|
||||
super.updatePreemptionVariables();
|
||||
// For child queues
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.updatePreemptionVariables();
|
||||
|
||||
readLock.lock();
|
||||
try {
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.updatePreemptionVariables();
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getDemand() {
|
||||
return demand;
|
||||
readLock.lock();
|
||||
try {
|
||||
return Resource.newInstance(demand.getMemory(), demand.getVirtualCores());
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getResourceUsage() {
|
||||
Resource usage = Resources.createResource(0);
|
||||
for (FSQueue child : childQueues) {
|
||||
Resources.addTo(usage, child.getResourceUsage());
|
||||
readLock.lock();
|
||||
try {
|
||||
for (FSQueue child : childQueues) {
|
||||
Resources.addTo(usage, child.getResourceUsage());
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
return usage;
|
||||
}
|
||||
|
@ -106,20 +153,25 @@ public class FSParentQueue extends FSQueue {
|
|||
// Limit demand to maxResources
|
||||
Resource maxRes = scheduler.getAllocationConfiguration()
|
||||
.getMaxResources(getName());
|
||||
demand = Resources.createResource(0);
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.updateDemand();
|
||||
Resource toAdd = childQueue.getDemand();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Counting resource from " + childQueue.getName() + " " +
|
||||
toAdd + "; Total resource consumption for " + getName() +
|
||||
" now " + demand);
|
||||
}
|
||||
demand = Resources.add(demand, toAdd);
|
||||
demand = Resources.componentwiseMin(demand, maxRes);
|
||||
if (Resources.equals(demand, maxRes)) {
|
||||
break;
|
||||
writeLock.lock();
|
||||
try {
|
||||
demand = Resources.createResource(0);
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.updateDemand();
|
||||
Resource toAdd = childQueue.getDemand();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Counting resource from " + childQueue.getName() + " " +
|
||||
toAdd + "; Total resource consumption for " + getName() +
|
||||
" now " + demand);
|
||||
}
|
||||
demand = Resources.add(demand, toAdd);
|
||||
demand = Resources.componentwiseMin(demand, maxRes);
|
||||
if (Resources.equals(demand, maxRes)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The updated demand for " + getName() + " is " + demand +
|
||||
|
@ -127,33 +179,31 @@ public class FSParentQueue extends FSQueue {
|
|||
}
|
||||
}
|
||||
|
||||
private synchronized QueueUserACLInfo getUserAclInfo(
|
||||
UserGroupInformation user) {
|
||||
QueueUserACLInfo userAclInfo =
|
||||
recordFactory.newRecordInstance(QueueUserACLInfo.class);
|
||||
List<QueueACL> operations = new ArrayList<QueueACL>();
|
||||
private QueueUserACLInfo getUserAclInfo(UserGroupInformation user) {
|
||||
List<QueueACL> operations = new ArrayList<>();
|
||||
for (QueueACL operation : QueueACL.values()) {
|
||||
if (hasAccess(operation, user)) {
|
||||
operations.add(operation);
|
||||
}
|
||||
}
|
||||
|
||||
userAclInfo.setQueueName(getQueueName());
|
||||
userAclInfo.setUserAcls(operations);
|
||||
return userAclInfo;
|
||||
return QueueUserACLInfo.newInstance(getQueueName(), operations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<QueueUserACLInfo> getQueueUserAclInfo(
|
||||
UserGroupInformation user) {
|
||||
public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
|
||||
List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
|
||||
|
||||
// Add queue acls
|
||||
userAcls.add(getUserAclInfo(user));
|
||||
|
||||
// Add children queue acls
|
||||
for (FSQueue child : childQueues) {
|
||||
userAcls.addAll(child.getQueueUserAclInfo(user));
|
||||
readLock.lock();
|
||||
try {
|
||||
for (FSQueue child : childQueues) {
|
||||
userAcls.addAll(child.getQueueUserAclInfo(user));
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
||||
return userAcls;
|
||||
|
@ -168,12 +218,32 @@ public class FSParentQueue extends FSQueue {
|
|||
return assigned;
|
||||
}
|
||||
|
||||
Collections.sort(childQueues, policy.getComparator());
|
||||
for (FSQueue child : childQueues) {
|
||||
assigned = child.assignContainer(node);
|
||||
if (!Resources.equals(assigned, Resources.none())) {
|
||||
break;
|
||||
// Hold the write lock when sorting childQueues
|
||||
writeLock.lock();
|
||||
try {
|
||||
Collections.sort(childQueues, policy.getComparator());
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
||||
/*
|
||||
* We are releasing the lock between the sort and iteration of the
|
||||
* "sorted" list. There could be changes to the list here:
|
||||
* 1. Add a child queue to the end of the list, this doesn't affect
|
||||
* container assignment.
|
||||
* 2. Remove a child queue, this is probably good to take care of so we
|
||||
* don't assign to a queue that is going to be removed shortly.
|
||||
*/
|
||||
readLock.lock();
|
||||
try {
|
||||
for (FSQueue child : childQueues) {
|
||||
assigned = child.assignContainer(node);
|
||||
if (!Resources.equals(assigned, Resources.none())) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
return assigned;
|
||||
}
|
||||
|
@ -185,11 +255,17 @@ public class FSParentQueue extends FSQueue {
|
|||
// Find the childQueue which is most over fair share
|
||||
FSQueue candidateQueue = null;
|
||||
Comparator<Schedulable> comparator = policy.getComparator();
|
||||
for (FSQueue queue : childQueues) {
|
||||
if (candidateQueue == null ||
|
||||
comparator.compare(queue, candidateQueue) > 0) {
|
||||
candidateQueue = queue;
|
||||
|
||||
readLock.lock();
|
||||
try {
|
||||
for (FSQueue queue : childQueues) {
|
||||
if (candidateQueue == null ||
|
||||
comparator.compare(queue, candidateQueue) > 0) {
|
||||
candidateQueue = queue;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
||||
// Let the selected queue choose which of its container to preempt
|
||||
|
@ -201,7 +277,12 @@ public class FSParentQueue extends FSQueue {
|
|||
|
||||
@Override
|
||||
public List<FSQueue> getChildQueues() {
|
||||
return childQueues;
|
||||
readLock.lock();
|
||||
try {
|
||||
return Collections.unmodifiableList(childQueues);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -218,23 +299,43 @@ public class FSParentQueue extends FSQueue {
|
|||
}
|
||||
|
||||
public void incrementRunnableApps() {
|
||||
runnableApps++;
|
||||
writeLock.lock();
|
||||
try {
|
||||
runnableApps++;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void decrementRunnableApps() {
|
||||
runnableApps--;
|
||||
writeLock.lock();
|
||||
try {
|
||||
runnableApps--;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumRunnableApps() {
|
||||
return runnableApps;
|
||||
readLock.lock();
|
||||
try {
|
||||
return runnableApps;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectSchedulerApplications(
|
||||
Collection<ApplicationAttemptId> apps) {
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.collectSchedulerApplications(apps);
|
||||
readLock.lock();
|
||||
try {
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.collectSchedulerApplications(apps);
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -304,7 +304,8 @@ public class QueueManager {
|
|||
}
|
||||
}
|
||||
queues.remove(queue.getName());
|
||||
queue.getParent().getChildQueues().remove(queue);
|
||||
FSParentQueue parent = queue.getParent();
|
||||
parent.removeChildQueue(queue);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue