YARN-6029. CapacityScheduler deadlock when ParentQueue#getQueueUserAclInfo is called by one thread and LeafQueue#assignContainers is releasing excessive reserved container is called by another thread. (Tao Yang via wangda)

This commit is contained in:
Wangda Tan 2017-01-03 16:21:46 -08:00
parent a58a1b55bf
commit ebafe075d6
1 changed files with 120 additions and 105 deletions

View File

@ -895,129 +895,144 @@ public class LeafQueue extends AbstractCSQueue {
}
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode) {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
FiCaSchedulerApp reservedApp = null;
CSAssignment reservedCSAssignment = null;
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " #applications=" + orderingPolicy.getNumSchedulableEntities());
synchronized (this) {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
if (LOG.isDebugEnabled()) {
LOG.debug(
"assignContainers: node=" + node.getNodeName() + " #applications="
+ orderingPolicy.getNumSchedulableEntities());
}
setPreemptionAllowed(currentResourceLimits, node.getPartition());
// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
reservedApp = getApplication(
reservedContainer.getApplicationAttemptId());
synchronized (reservedApp) {
reservedCSAssignment = reservedApp.assignContainers(
clusterResource, node, currentResourceLimits, schedulingMode,
reservedContainer);
}
}
}
setPreemptionAllowed(currentResourceLimits, node.getPartition());
// Handle possible completedContainer out of synchronized lock to avoid
// deadlock.
if (reservedCSAssignment != null) {
handleExcessReservedContainer(clusterResource, reservedCSAssignment, node,
reservedApp);
killToPreemptContainers(clusterResource, node, reservedCSAssignment);
return reservedCSAssignment;
}
// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
FiCaSchedulerApp application =
getApplication(reservedContainer.getApplicationAttemptId());
synchronized (application) {
synchronized (this) {
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(node.getPartition())) {
return CSAssignment.NULL_ASSIGNMENT;
}
// Check if this queue need more resource, simply skip allocation if this
// queue doesn't need more resources.
if (!hasPendingResourceRequest(node.getPartition(), clusterResource,
schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + node
.getPartition());
}
return CSAssignment.NULL_ASSIGNMENT;
}
for (Iterator<FiCaSchedulerApp> assignmentIterator =
orderingPolicy.getAssignmentIterator(); assignmentIterator
.hasNext(); ) {
FiCaSchedulerApp application = assignmentIterator.next();
// Check queue max-capacity limit
if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
currentResourceLimits, application.getCurrentReservation(),
schedulingMode)) {
return CSAssignment.NULL_ASSIGNMENT;
}
Resource userLimit =
computeUserLimitAndSetHeadroom(application, clusterResource,
node.getPartition(), schedulingMode);
// Check user limit
if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
application, node.getPartition(), currentResourceLimits)) {
application.updateAMContainerDiagnostics(AMState.ACTIVATED,
"User capacity has reached its maximum limit.");
continue;
}
// Try to schedule
CSAssignment assignment =
application.assignContainers(clusterResource, node,
currentResourceLimits, schedulingMode, reservedContainer);
currentResourceLimits, schedulingMode, null);
if (LOG.isDebugEnabled()) {
LOG.debug("post-assignContainers for application "
+ application.getApplicationId());
application.showRequests();
}
// Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
handleExcessReservedContainer(clusterResource, assignment, node,
application);
killToPreemptContainers(clusterResource, node, assignment);
return assignment;
}
}
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(node.getPartition())) {
return CSAssignment.NULL_ASSIGNMENT;
}
if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
Resources.none())) {
// Get reserved or allocated container from application
RMContainer reservedOrAllocatedRMContainer =
application.getRMContainer(assignment.getAssignmentInformation()
.getFirstAllocatedOrReservedContainerId());
// Check if this queue need more resource, simply skip allocation if this
// queue doesn't need more resources.
if (!hasPendingResourceRequest(node.getPartition(), clusterResource,
schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + node.getPartition());
}
return CSAssignment.NULL_ASSIGNMENT;
}
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(clusterResource, application, assigned,
node.getPartition(), reservedOrAllocatedRMContainer,
assignment.isIncreasedAllocation());
for (Iterator<FiCaSchedulerApp> assignmentIterator =
orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) {
FiCaSchedulerApp application = assignmentIterator.next();
// Update reserved metrics
Resource reservedRes = assignment.getAssignmentInformation()
.getReserved();
if (reservedRes != null && !reservedRes.equals(Resources.none())) {
incReservedResource(node.getPartition(), reservedRes);
}
// Check queue max-capacity limit
if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
currentResourceLimits, application.getCurrentReservation(),
schedulingMode)) {
return CSAssignment.NULL_ASSIGNMENT;
}
Resource userLimit =
computeUserLimitAndSetHeadroom(application, clusterResource,
node.getPartition(), schedulingMode);
// Check user limit
if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
application, node.getPartition(), currentResourceLimits)) {
application.updateAMContainerDiagnostics(AMState.ACTIVATED,
"User capacity has reached its maximum limit.");
continue;
}
// Try to schedule
CSAssignment assignment =
application.assignContainers(clusterResource, node,
currentResourceLimits, schedulingMode, null);
if (LOG.isDebugEnabled()) {
LOG.debug("post-assignContainers for application "
+ application.getApplicationId());
application.showRequests();
}
// Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
handleExcessReservedContainer(clusterResource, assignment, node,
application);
killToPreemptContainers(clusterResource, node, assignment);
if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
Resources.none())) {
// Get reserved or allocated container from application
RMContainer reservedOrAllocatedRMContainer =
application.getRMContainer(assignment.getAssignmentInformation()
.getFirstAllocatedOrReservedContainerId());
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(clusterResource, application, assigned,
node.getPartition(), reservedOrAllocatedRMContainer,
assignment.isIncreasedAllocation());
// Update reserved metrics
Resource reservedRes = assignment.getAssignmentInformation()
.getReserved();
if (reservedRes != null && !reservedRes.equals(Resources.none())) {
incReservedResource(node.getPartition(), reservedRes);
// Done
return assignment;
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.OTHER) {
application.updateNodeInfoForAMDiagnostics(node);
} else if(assignment.getSkippedType()
== CSAssignment.SkippedType.QUEUE_LIMIT) {
return assignment;
} else {
// If we don't allocate anything, and it is not skipped by application,
// we will return to respect FIFO of applications
return CSAssignment.NULL_ASSIGNMENT;
}
// Done
return assignment;
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.OTHER) {
application.updateNodeInfoForAMDiagnostics(node);
} else if(assignment.getSkippedType()
== CSAssignment.SkippedType.QUEUE_LIMIT) {
return assignment;
} else {
// If we don't allocate anything, and it is not skipped by application,
// we will return to respect FIFO of applications
return CSAssignment.NULL_ASSIGNMENT;
}
}
return CSAssignment.NULL_ASSIGNMENT;
return CSAssignment.NULL_ASSIGNMENT;
}
}
protected Resource getHeadroom(User user, Resource queueCurrentLimit,