From ebafe075d69fcfa1945c21a29b36b33cd15d7d93 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Tue, 3 Jan 2017 16:21:46 -0800 Subject: [PATCH] YARN-6029. CapacityScheduler deadlock when ParentQueue#getQueueUserAclInfo is called by one thread and LeafQueue#assignContainers is releasing excessive reserved container is called by another thread. (Tao Yang via wangda) --- .../scheduler/capacity/LeafQueue.java | 225 ++++++++++-------- 1 file changed, 120 insertions(+), 105 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index eef43c308eb..86e8c09989c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -895,129 +895,144 @@ public class LeafQueue extends AbstractCSQueue { } @Override - public synchronized CSAssignment assignContainers(Resource clusterResource, + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { - updateCurrentResourceLimits(currentResourceLimits, clusterResource); + FiCaSchedulerApp reservedApp = null; + CSAssignment reservedCSAssignment = null; - if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: node=" + node.getNodeName() - + " #applications=" + orderingPolicy.getNumSchedulableEntities()); + synchronized (this) { + updateCurrentResourceLimits(currentResourceLimits, clusterResource); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "assignContainers: node=" + node.getNodeName() + " #applications=" + + orderingPolicy.getNumSchedulableEntities()); + } + + setPreemptionAllowed(currentResourceLimits, node.getPartition()); + + // Check for reserved resources + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + reservedApp = getApplication( + reservedContainer.getApplicationAttemptId()); + synchronized (reservedApp) { + reservedCSAssignment = reservedApp.assignContainers( + clusterResource, node, currentResourceLimits, schedulingMode, + reservedContainer); + } + } } - setPreemptionAllowed(currentResourceLimits, node.getPartition()); + // Handle possible completedContainer out of synchronized lock to avoid + // deadlock. + if (reservedCSAssignment != null) { + handleExcessReservedContainer(clusterResource, reservedCSAssignment, node, + reservedApp); + killToPreemptContainers(clusterResource, node, reservedCSAssignment); + return reservedCSAssignment; + } - // Check for reserved resources - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - FiCaSchedulerApp application = - getApplication(reservedContainer.getApplicationAttemptId()); - synchronized (application) { + synchronized (this) { + // if our queue cannot access this node, just return + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY + && !accessibleToPartition(node.getPartition())) { + return CSAssignment.NULL_ASSIGNMENT; + } + + // Check if this queue need more resource, simply skip allocation if this + // queue doesn't need more resources. + if (!hasPendingResourceRequest(node.getPartition(), clusterResource, + schedulingMode)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip this queue=" + getQueuePath() + + ", because it doesn't need more resource, schedulingMode=" + + schedulingMode.name() + " node-partition=" + node + .getPartition()); + } + return CSAssignment.NULL_ASSIGNMENT; + } + + for (Iterator assignmentIterator = + orderingPolicy.getAssignmentIterator(); assignmentIterator + .hasNext(); ) { + FiCaSchedulerApp application = assignmentIterator.next(); + + // Check queue max-capacity limit + if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + currentResourceLimits, application.getCurrentReservation(), + schedulingMode)) { + return CSAssignment.NULL_ASSIGNMENT; + } + + Resource userLimit = + computeUserLimitAndSetHeadroom(application, clusterResource, + node.getPartition(), schedulingMode); + + // Check user limit + if (!canAssignToUser(clusterResource, application.getUser(), userLimit, + application, node.getPartition(), currentResourceLimits)) { + application.updateAMContainerDiagnostics(AMState.ACTIVATED, + "User capacity has reached its maximum limit."); + continue; + } + + // Try to schedule CSAssignment assignment = application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode, reservedContainer); + currentResourceLimits, schedulingMode, null); + + if (LOG.isDebugEnabled()) { + LOG.debug("post-assignContainers for application " + + application.getApplicationId()); + application.showRequests(); + } + + // Did we schedule or reserve a container? + Resource assigned = assignment.getResource(); + handleExcessReservedContainer(clusterResource, assignment, node, application); killToPreemptContainers(clusterResource, node, assignment); - return assignment; - } - } - // if our queue cannot access this node, just return - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(node.getPartition())) { - return CSAssignment.NULL_ASSIGNMENT; - } + if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, + Resources.none())) { + // Get reserved or allocated container from application + RMContainer reservedOrAllocatedRMContainer = + application.getRMContainer(assignment.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId()); - // Check if this queue need more resource, simply skip allocation if this - // queue doesn't need more resources. - if (!hasPendingResourceRequest(node.getPartition(), clusterResource, - schedulingMode)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip this queue=" + getQueuePath() - + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + node.getPartition()); - } - return CSAssignment.NULL_ASSIGNMENT; - } + // Book-keeping + // Note: Update headroom to account for current allocation too... + allocateResource(clusterResource, application, assigned, + node.getPartition(), reservedOrAllocatedRMContainer, + assignment.isIncreasedAllocation()); - for (Iterator assignmentIterator = - orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) { - FiCaSchedulerApp application = assignmentIterator.next(); + // Update reserved metrics + Resource reservedRes = assignment.getAssignmentInformation() + .getReserved(); + if (reservedRes != null && !reservedRes.equals(Resources.none())) { + incReservedResource(node.getPartition(), reservedRes); + } - // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - currentResourceLimits, application.getCurrentReservation(), - schedulingMode)) { - return CSAssignment.NULL_ASSIGNMENT; - } - - Resource userLimit = - computeUserLimitAndSetHeadroom(application, clusterResource, - node.getPartition(), schedulingMode); - - // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, node.getPartition(), currentResourceLimits)) { - application.updateAMContainerDiagnostics(AMState.ACTIVATED, - "User capacity has reached its maximum limit."); - continue; - } - - // Try to schedule - CSAssignment assignment = - application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode, null); - - if (LOG.isDebugEnabled()) { - LOG.debug("post-assignContainers for application " - + application.getApplicationId()); - application.showRequests(); - } - - // Did we schedule or reserve a container? - Resource assigned = assignment.getResource(); - - handleExcessReservedContainer(clusterResource, assignment, node, - application); - killToPreemptContainers(clusterResource, node, assignment); - - if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, - Resources.none())) { - // Get reserved or allocated container from application - RMContainer reservedOrAllocatedRMContainer = - application.getRMContainer(assignment.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId()); - - // Book-keeping - // Note: Update headroom to account for current allocation too... - allocateResource(clusterResource, application, assigned, - node.getPartition(), reservedOrAllocatedRMContainer, - assignment.isIncreasedAllocation()); - - // Update reserved metrics - Resource reservedRes = assignment.getAssignmentInformation() - .getReserved(); - if (reservedRes != null && !reservedRes.equals(Resources.none())) { - incReservedResource(node.getPartition(), reservedRes); + // Done + return assignment; + } else if (assignment.getSkippedType() + == CSAssignment.SkippedType.OTHER) { + application.updateNodeInfoForAMDiagnostics(node); + } else if(assignment.getSkippedType() + == CSAssignment.SkippedType.QUEUE_LIMIT) { + return assignment; + } else { + // If we don't allocate anything, and it is not skipped by application, + // we will return to respect FIFO of applications + return CSAssignment.NULL_ASSIGNMENT; } - - // Done - return assignment; - } else if (assignment.getSkippedType() - == CSAssignment.SkippedType.OTHER) { - application.updateNodeInfoForAMDiagnostics(node); - } else if(assignment.getSkippedType() - == CSAssignment.SkippedType.QUEUE_LIMIT) { - return assignment; - } else { - // If we don't allocate anything, and it is not skipped by application, - // we will return to respect FIFO of applications - return CSAssignment.NULL_ASSIGNMENT; } - } - return CSAssignment.NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; + } } protected Resource getHeadroom(User user, Resource queueCurrentLimit,