diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b2f25cd52f6..e15fdf2c0bb 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -8,6 +8,9 @@ Release 2.8.0 - UNRELEASED IMPROVEMENTS + YARN-3243. CapacityScheduler should pass headroom from parent to children + to make sure ParentQueue obey its capacity limits. (Wangda Tan via jianhe) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index d8007097a20..4e53060beb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -20,10 +20,13 @@ import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; @@ -34,6 +37,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; @@ -49,6 +53,7 @@ import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { + private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); CSQueue parent; final String queueName; @@ -406,21 +411,102 @@ private boolean isQueueHierarchyPreemptionDisabled(CSQueue q) { parentQ.getPreemptionDisabled()); } - protected Resource getCurrentResourceLimit(Resource clusterResource, - ResourceLimits currentResourceLimits) { + private Resource getCurrentLimitResource(String nodeLabel, + Resource clusterResource, ResourceLimits currentResourceLimits) { /* - * Queue's max available resource = min(my.max, my.limit) - * my.limit is set by my parent, considered used resource of my siblings + * Current limit resource: For labeled resource: limit = queue-max-resource + * (TODO, this part need update when we support labeled-limit) For + * non-labeled resource: limit = min(queue-max-resource, + * limit-set-by-parent) */ Resource queueMaxResource = - Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource, - queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation); - Resource queueCurrentResourceLimit = - Resources.min(resourceCalculator, clusterResource, queueMaxResource, - currentResourceLimits.getLimit()); - queueCurrentResourceLimit = - Resources.roundDown(resourceCalculator, queueCurrentResourceLimit, - minimumAllocation); - return queueCurrentResourceLimit; + Resources.multiplyAndNormalizeDown(resourceCalculator, + labelManager.getResourceByLabel(nodeLabel, clusterResource), + queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation); + if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) { + return Resources.min(resourceCalculator, clusterResource, + queueMaxResource, currentResourceLimits.getLimit()); + } + return queueMaxResource; + } + + synchronized boolean canAssignToThisQueue(Resource clusterResource, + Set nodeLabels, ResourceLimits currentResourceLimits, + Resource nowRequired, Resource resourceCouldBeUnreserved) { + // Get label of this queue can access, it's (nodeLabel AND queueLabel) + Set labelCanAccess; + if (null == nodeLabels || nodeLabels.isEmpty()) { + labelCanAccess = new HashSet(); + // Any queue can always access any node without label + labelCanAccess.add(RMNodeLabelsManager.NO_LABEL); + } else { + labelCanAccess = new HashSet( + accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels + : Sets.intersection(accessibleLabels, nodeLabels)); + } + + for (String label : labelCanAccess) { + // New total resource = used + required + Resource newTotalResource = + Resources.add(queueUsage.getUsed(label), nowRequired); + + Resource currentLimitResource = + getCurrentLimitResource(label, clusterResource, currentResourceLimits); + + // if reservation continous looking enabled, check to see if could we + // potentially use this node instead of a reserved node if the application + // has reserved containers. + // TODO, now only consider reservation cases when the node has no label + if (this.reservationsContinueLooking + && label.equals(RMNodeLabelsManager.NO_LABEL) + && Resources.greaterThan(resourceCalculator, clusterResource, + resourceCouldBeUnreserved, Resources.none())) { + // resource-without-reserved = used - reserved + Resource newTotalWithoutReservedResource = + Resources.subtract(newTotalResource, resourceCouldBeUnreserved); + + // when total-used-without-reserved-resource < currentLimit, we still + // have chance to allocate on this node by unreserving some containers + if (Resources.lessThan(resourceCalculator, clusterResource, + newTotalWithoutReservedResource, currentLimitResource)) { + if (LOG.isDebugEnabled()) { + LOG.debug("try to use reserved: " + getQueueName() + + " usedResources: " + queueUsage.getUsed() + + ", clusterResources: " + clusterResource + + ", reservedResources: " + resourceCouldBeUnreserved + + ", capacity-without-reserved: " + + newTotalWithoutReservedResource + ", maxLimitCapacity: " + + currentLimitResource); + } + return true; + } + } + + // Otherwise, if any of the label of this node beyond queue limit, we + // cannot allocate on this node. Consider a small epsilon here. + if (Resources.greaterThan(resourceCalculator, clusterResource, + newTotalResource, currentLimitResource)) { + return false; + } + + if (LOG.isDebugEnabled()) { + LOG.debug(getQueueName() + + "Check assign to queue, label=" + label + + " usedResources: " + queueUsage.getUsed(label) + + " clusterResources: " + clusterResource + + " currentUsedCapacity " + + Resources.divide(resourceCalculator, clusterResource, + queueUsage.getUsed(label), + labelManager.getResourceByLabel(label, clusterResource)) + + " max-capacity: " + + queueCapacities.getAbsoluteMaximumCapacity(label) + + ")"); + } + return true; + } + + // Actually, this will not happen, since labelCanAccess will be always + // non-empty + return false; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 0a60acc9fdc..1a9448acaa1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -189,13 +189,11 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, * Assign containers to applications in the queue or it's children (if any). * @param clusterResource the resource of the cluster. * @param node node on which resources are available - * @param needToUnreserve assign container only if it can unreserve one first * @param resourceLimits how much overall resource of this queue can use. * @return the assignment */ public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, boolean needToUnreserve, - ResourceLimits resourceLimits); + FiCaSchedulerNode node, ResourceLimits resourceLimits); /** * A container assigned to the queue has completed. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 756e53748c3..c86c0ff0cd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1061,9 +1061,14 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { node.getNodeID()); LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); - CSAssignment assignment = queue.assignContainers(clusterResource, node, - false, new ResourceLimits( - clusterResource)); + CSAssignment assignment = + queue.assignContainers( + clusterResource, + node, + // TODO, now we only consider limits for parent for non-labeled + // resources, should consider labeled resources as well. + new ResourceLimits(labelManager.getResourceByLabel( + RMNodeLabelsManager.NO_LABEL, clusterResource))); RMContainer excessReservation = assignment.getExcessReservation(); if (excessReservation != null) { @@ -1087,8 +1092,13 @@ false, new ResourceLimits( LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getAvailableResource()); } - root.assignContainers(clusterResource, node, false, new ResourceLimits( - clusterResource)); + root.assignContainers( + clusterResource, + node, + // TODO, now we only consider limits for parent for non-labeled + // resources, should consider labeled resources as well. + new ResourceLimits(labelManager.getResourceByLabel( + RMNodeLabelsManager.NO_LABEL, clusterResource))); } } else { LOG.info("Skipping scheduling since node " + node.getNodeID() + @@ -1209,6 +1219,13 @@ private synchronized void addNode(RMNode nodeManager) { usePortForNodeName, nodeManager.getNodeLabels()); this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + + // update this node to node label manager + if (labelManager != null) { + labelManager.activateNode(nodeManager.getNodeID(), + nodeManager.getTotalCapability()); + } + root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); int numNodes = numNodeManagers.incrementAndGet(); @@ -1220,12 +1237,6 @@ private synchronized void addNode(RMNode nodeManager) { if (scheduleAsynchronously && numNodes == 1) { asyncSchedulerThread.beginSchedule(); } - - // update this node to node label manager - if (labelManager != null) { - labelManager.activateNode(nodeManager.getNodeID(), - nodeManager.getTotalCapability()); - } } private synchronized void removeNode(RMNode nodeInfo) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a607a6221f6..dd6a8946875 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; @Private @Unstable @@ -157,7 +156,7 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) // and all queues may not be realized yet, we'll use (optimistic) // absoluteMaxCapacity (it will be replaced with the more accurate // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) - computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource); + setQueueResourceLimitsInfo(clusterResource); CapacitySchedulerConfiguration conf = csContext.getConfiguration(); userLimit = conf.getUserLimit(getQueuePath()); @@ -739,9 +738,8 @@ private static Set getRequestLabelSetByExpression( @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, boolean needToUnreserve, - ResourceLimits currentResourceLimits) { - this.currentResourceLimits = currentResourceLimits; + FiCaSchedulerNode node, ResourceLimits currentResourceLimits) { + updateCurrentResourceLimits(currentResourceLimits, clusterResource); if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() @@ -796,7 +794,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, continue; } if (!this.reservationsContinueLooking) { - if (!needContainers(application, priority, required)) { + if (!shouldAllocOrReserveNewContainer(application, priority, required)) { if (LOG.isDebugEnabled()) { LOG.debug("doesn't need containers based on reservation algo!"); } @@ -818,8 +816,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, required, requestedNodeLabels); // Check queue max-capacity limit - if (!canAssignToThisQueue(clusterResource, required, - node.getLabels(), application, true)) { + if (!super.canAssignToThisQueue(clusterResource, node.getLabels(), + this.currentResourceLimits, required, application.getCurrentReservation())) { return NULL_ASSIGNMENT; } @@ -835,7 +833,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Try to schedule CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, - null, needToUnreserve); + null); // Did the application skip this node? if (assignment.getSkipped()) { @@ -896,7 +894,7 @@ private synchronized CSAssignment assignReservedContainer( // Try to assign if we have sufficient resources assignContainersOnNode(clusterResource, node, application, priority, - rmContainer, false); + rmContainer); // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* @@ -938,102 +936,14 @@ private Resource getHeadroom(User user, Resource currentResourceLimit, Resources.roundDown(resourceCalculator, headroom, minimumAllocation); return headroom; } - - synchronized boolean canAssignToThisQueue(Resource clusterResource, - Resource required, Set nodeLabels, FiCaSchedulerApp application, - boolean checkReservations) { - // Get label of this queue can access, it's (nodeLabel AND queueLabel) - Set labelCanAccess; - if (null == nodeLabels || nodeLabels.isEmpty()) { - labelCanAccess = new HashSet(); - // Any queue can always access any node without label - labelCanAccess.add(RMNodeLabelsManager.NO_LABEL); - } else { - labelCanAccess = new HashSet(Sets.intersection(accessibleLabels, nodeLabels)); - } - - boolean canAssign = true; - for (String label : labelCanAccess) { - Resource potentialTotalCapacity = - Resources.add(queueUsage.getUsed(label), required); - - float potentialNewCapacity = - Resources.divide(resourceCalculator, clusterResource, - potentialTotalCapacity, - labelManager.getResourceByLabel(label, clusterResource)); - // if enabled, check to see if could we potentially use this node instead - // of a reserved node if the application has reserved containers - // TODO, now only consider reservation cases when the node has no label - if (this.reservationsContinueLooking && checkReservations - && label.equals(RMNodeLabelsManager.NO_LABEL)) { - float potentialNewWithoutReservedCapacity = Resources.divide( - resourceCalculator, - clusterResource, - Resources.subtract(potentialTotalCapacity, - application.getCurrentReservation()), - labelManager.getResourceByLabel(label, clusterResource)); - - if (potentialNewWithoutReservedCapacity <= queueCapacities - .getAbsoluteMaximumCapacity()) { - if (LOG.isDebugEnabled()) { - LOG.debug("try to use reserved: " - + getQueueName() - + " usedResources: " - + queueUsage.getUsed() - + " clusterResources: " - + clusterResource - + " reservedResources: " - + application.getCurrentReservation() - + " currentCapacity " - + Resources.divide(resourceCalculator, clusterResource, - queueUsage.getUsed(), clusterResource) + " required " + required - + " potentialNewWithoutReservedCapacity: " - + potentialNewWithoutReservedCapacity + " ( " - + " max-capacity: " - + queueCapacities.getAbsoluteMaximumCapacity() + ")"); - } - // we could potentially use this node instead of reserved node - return true; - } - } - - // Otherwise, if any of the label of this node beyond queue limit, we - // cannot allocate on this node. Consider a small epsilon here. - if (potentialNewCapacity > queueCapacities - .getAbsoluteMaximumCapacity(label) + 1e-4) { - canAssign = false; - break; - } - - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() - + "Check assign to queue, label=" + label - + " usedResources: " + queueUsage.getUsed(label) - + " clusterResources: " + clusterResource - + " currentCapacity " - + Resources.divide(resourceCalculator, clusterResource, - queueUsage.getUsed(label), - labelManager.getResourceByLabel(label, clusterResource)) - + " potentialNewCapacity: " + potentialNewCapacity + " ( " - + " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity() - + ")"); - } - } - - return canAssign; - } - private Resource computeQueueCurrentLimitAndSetHeadroomInfo( + private void setQueueResourceLimitsInfo( Resource clusterResource) { - Resource queueCurrentResourceLimit = - getCurrentResourceLimit(clusterResource, currentResourceLimits); - synchronized (queueResourceLimitsInfo) { - queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit); + queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits + .getLimit()); queueResourceLimitsInfo.setClusterResource(clusterResource); } - - return queueCurrentResourceLimit; } @Lock({LeafQueue.class, FiCaSchedulerApp.class}) @@ -1048,16 +958,16 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, computeUserLimit(application, clusterResource, required, queueUser, requestedLabels); - Resource currentResourceLimit = - computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource); + setQueueResourceLimitsInfo(clusterResource); Resource headroom = - getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit); + getHeadroom(queueUser, currentResourceLimits.getLimit(), + clusterResource, userLimit); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + - " queueMaxAvailRes=" + currentResourceLimit + + " queueMaxAvailRes=" + currentResourceLimits.getLimit() + " consumed=" + queueUser.getUsed() + " headroom=" + headroom); } @@ -1207,8 +1117,8 @@ protected synchronized boolean assignToUser(Resource clusterResource, return true; } - boolean needContainers(FiCaSchedulerApp application, Priority priority, - Resource required) { + boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application, + Priority priority, Resource required) { int requiredContainers = application.getTotalRequiredResources(priority); int reservedContainers = application.getNumReservedContainers(priority); int starvation = 0; @@ -1240,7 +1150,7 @@ resourceCalculator, required, getMaximumAllocation() private CSAssignment assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, boolean needToUnreserve) { + RMContainer reservedContainer) { Resource assigned = Resources.none(); NodeType requestType = null; @@ -1252,7 +1162,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, requestType = NodeType.NODE_LOCAL; assigned = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, - node, application, priority, reservedContainer, needToUnreserve, + node, application, priority, reservedContainer, allocatedContainer); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1280,7 +1190,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, - node, application, priority, reservedContainer, needToUnreserve, + node, application, priority, reservedContainer, allocatedContainer); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1308,7 +1218,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, - node, application, priority, reservedContainer, needToUnreserve, + node, application, priority, reservedContainer, allocatedContainer); // update locality statistics @@ -1320,13 +1230,24 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, return SKIP_ASSIGNMENT; } + + private Resource getMinimumResourceNeedUnreserved(Resource askedResource) { + // First we need to get minimum resource we need unreserve + // minimum-resource-need-unreserve = used + asked - limit + Resource minimumUnreservedResource = + Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource), + currentResourceLimits.getLimit()); + return minimumUnreservedResource; + } @Private protected boolean findNodeToUnreserve(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - Resource capability) { + Resource askedResource, Resource minimumUnreservedResource) { // need to unreserve some other container first - NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability); + NodeId idToUnreserve = + application.getNodeIdToUnreserve(priority, minimumUnreservedResource, + resourceCalculator, clusterResource); if (idToUnreserve == null) { if (LOG.isDebugEnabled()) { LOG.debug("checked to see if could unreserve for app but nothing " @@ -1343,7 +1264,7 @@ protected boolean findNodeToUnreserve(Resource clusterResource, LOG.debug("unreserving for app: " + application.getApplicationId() + " on nodeId: " + idToUnreserve + " in order to replace reserved application and place it on node: " - + node.getNodeID() + " needing: " + capability); + + node.getNodeID() + " needing: " + askedResource); } // headroom @@ -1364,15 +1285,7 @@ protected boolean findNodeToUnreserve(Resource clusterResource, @Private protected boolean checkLimitsToReserve(Resource clusterResource, - FiCaSchedulerApp application, Resource capability, - boolean needToUnreserve) { - if (needToUnreserve) { - if (LOG.isDebugEnabled()) { - LOG.debug("we needed to unreserve to be able to allocate"); - } - return false; - } - + FiCaSchedulerApp application, Resource capability) { // we can't reserve if we got here based on the limit // checks assuming we could unreserve!!! Resource userLimit = computeUserLimitAndSetHeadroom(application, @@ -1380,7 +1293,8 @@ protected boolean checkLimitsToReserve(Resource clusterResource, // Check queue max-capacity limit, // TODO: Consider reservation on labels - if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) { + if (!canAssignToThisQueue(clusterResource, null, + this.currentResourceLimits, capability, Resources.none())) { if (LOG.isDebugEnabled()) { LOG.debug("was going to reserve but hit queue limit"); } @@ -1402,43 +1316,40 @@ protected boolean checkLimitsToReserve(Resource clusterResource, private Resource assignNodeLocalContainers(Resource clusterResource, ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, boolean needToUnreserve, - MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer) { if (canAssign(application, priority, node, NodeType.NODE_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, - needToUnreserve, allocatedContainer); + allocatedContainer); } return Resources.none(); } - private Resource assignRackLocalContainers( - Resource clusterResource, ResourceRequest rackLocalResourceRequest, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, boolean needToUnreserve, - MutableObject allocatedContainer) { + private Resource assignRackLocalContainers(Resource clusterResource, + ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, + RMContainer reservedContainer, MutableObject allocatedContainer) { if (canAssign(application, priority, node, NodeType.RACK_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, - needToUnreserve, allocatedContainer); + allocatedContainer); } return Resources.none(); } - private Resource assignOffSwitchContainers( - Resource clusterResource, ResourceRequest offSwitchResourceRequest, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, boolean needToUnreserve, - MutableObject allocatedContainer) { + private Resource assignOffSwitchContainers(Resource clusterResource, + ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, + RMContainer reservedContainer, MutableObject allocatedContainer) { if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, - needToUnreserve, allocatedContainer); + allocatedContainer); } return Resources.none(); @@ -1522,13 +1433,12 @@ Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer, - boolean needToUnreserve, MutableObject createdContainer) { + MutableObject createdContainer) { if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + application.getApplicationId() + " priority=" + priority.getPriority() - + " request=" + request + " type=" + type - + " needToUnreserve= " + needToUnreserve); + + " request=" + request + " type=" + type); } // check if the resource request can access the label @@ -1548,12 +1458,14 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod Resource available = node.getAvailableResource(); Resource totalResource = node.getTotalResource(); - if (!Resources.fitsIn(capability, totalResource)) { + if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource, + capability, totalResource)) { LOG.warn("Node : " + node.getNodeID() + " does not have sufficient resource for request : " + request + " node total capability : " + node.getTotalResource()); return Resources.none(); } + assert Resources.greaterThan( resourceCalculator, clusterResource, available, Resources.none()); @@ -1566,18 +1478,9 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod LOG.warn("Couldn't get container for allocation!"); return Resources.none(); } - - // default to true since if reservation continue look feature isn't on - // needContainers is checked earlier and we wouldn't have gotten this far - boolean canAllocContainer = true; - if (this.reservationsContinueLooking) { - // based on reservations can we allocate/reserve more or do we need - // to unreserve one first - canAllocContainer = needContainers(application, priority, capability); - if (LOG.isDebugEnabled()) { - LOG.debug("can alloc container is: " + canAllocContainer); - } - } + + boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( + application, priority, capability); // Can we allocate a container on this node? int availableContainers = @@ -1588,25 +1491,25 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod // Did we previously reserve containers at this 'priority'? if (rmContainer != null) { unreserve(application, priority, node, rmContainer); - } else if (this.reservationsContinueLooking - && (!canAllocContainer || needToUnreserve)) { - // need to unreserve some other container first - boolean res = findNodeToUnreserve(clusterResource, node, application, - priority, capability); - if (!res) { - return Resources.none(); - } - } else { - // we got here by possibly ignoring queue capacity limits. If the - // parameter needToUnreserve is true it means we ignored one of those - // limits in the chance we could unreserve. If we are here we aren't - // trying to unreserve so we can't allocate anymore due to that parent - // limit. - if (needToUnreserve) { - if (LOG.isDebugEnabled()) { - LOG.debug("we needed to unreserve to be able to allocate, skipping"); + } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) { + // when reservationsContinueLooking is set, we may need to unreserve + // some containers to meet this queue and its parents' resource limits + // TODO, need change here when we want to support continuous reservation + // looking for labeled partitions. + Resource minimumUnreservedResource = + getMinimumResourceNeedUnreserved(capability); + if (!shouldAllocOrReserveNewContainer + || Resources.greaterThan(resourceCalculator, clusterResource, + minimumUnreservedResource, Resources.none())) { + boolean containerUnreserved = + findNodeToUnreserve(clusterResource, node, application, priority, + capability, minimumUnreservedResource); + // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved + // container (That means we *have to* unreserve some resource to + // continue)). If we failed to unreserve some resource, + if (!containerUnreserved) { + return Resources.none(); } - return Resources.none(); } } @@ -1632,17 +1535,16 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod } else { // if we are allowed to allocate but this node doesn't have space, reserve it or // if this was an already a reserved container, reserve it again - if ((canAllocContainer) || (rmContainer != null)) { + if (shouldAllocOrReserveNewContainer || rmContainer != null) { - if (reservationsContinueLooking) { - // we got here by possibly ignoring parent queue capacity limits. If - // the parameter needToUnreserve is true it means we ignored one of - // those limits in the chance we could unreserve. If we are here - // we aren't trying to unreserve so we can't allocate - // anymore due to that parent limit - boolean res = checkLimitsToReserve(clusterResource, application, capability, - needToUnreserve); - if (!res) { + if (reservationsContinueLooking && rmContainer == null) { + // we could possibly ignoring parent queue capacity limits when + // reservationsContinueLooking is set. + // If we're trying to reserve a container here, not container will be + // unreserved for reserving the new one. Check limits again before + // reserve the new container + if (!checkLimitsToReserve(clusterResource, + application, capability)) { return Resources.none(); } } @@ -1784,18 +1686,36 @@ private void updateAbsoluteCapacityResource(Resource clusterResource) { Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource, queueCapacities.getAbsoluteCapacity(), minimumAllocation); } + + private void updateCurrentResourceLimits( + ResourceLimits currentResourceLimits, Resource clusterResource) { + // TODO: need consider non-empty node labels when resource limits supports + // node labels + // Even if ParentQueue will set limits respect child's max queue capacity, + // but when allocating reserved container, CapacityScheduler doesn't do + // this. So need cap limits by queue's max capacity here. + this.currentResourceLimits = currentResourceLimits; + Resource queueMaxResource = + Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), + queueCapacities + .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL), + minimumAllocation); + this.currentResourceLimits.setLimit(Resources.min(resourceCalculator, + clusterResource, queueMaxResource, currentResourceLimits.getLimit())); + } @Override public synchronized void updateClusterResource(Resource clusterResource, ResourceLimits currentResourceLimits) { - this.currentResourceLimits = currentResourceLimits; + updateCurrentResourceLimits(currentResourceLimits, clusterResource); lastClusterResource = clusterResource; updateAbsoluteCapacityResource(clusterResource); // Update headroom info based on new cluster resource value // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity // during allocation - computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource); + setQueueResourceLimitsInfo(clusterResource); // Update metrics CSQueueUtils.updateQueueStatistics( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 7feaa152fbb..5ed6bb8c932 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -63,8 +61,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.collect.Sets; - @Private @Evolving public class ParentQueue extends AbstractCSQueue { @@ -380,8 +376,7 @@ private synchronized void removeApplication(ApplicationId applicationId, @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, boolean needToUnreserve, - ResourceLimits resourceLimits) { + FiCaSchedulerNode node, ResourceLimits resourceLimits) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); Set nodeLabels = node.getLabels(); @@ -397,21 +392,18 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + getQueueName()); } - boolean localNeedToUnreserve = false; - // Are we over maximum-capacity for this queue? - if (!canAssignToThisQueue(clusterResource, nodeLabels)) { - // check to see if we could if we unreserve first - localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource); - if (!localNeedToUnreserve) { - break; - } + // This will also consider parent's limits and also continuous reservation + // looking + if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits, + minimumAllocation, Resources.createResource(getMetrics() + .getReservedMB(), getMetrics().getReservedVirtualCores()))) { + break; } // Schedule CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node, - localNeedToUnreserve | needToUnreserve, resourceLimits); + assignContainersToChildQueues(clusterResource, node, resourceLimits); assignment.setType(assignedToChild.getType()); // Done if no child-queue assigned anything @@ -459,74 +451,6 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, return assignment; } - private synchronized boolean canAssignToThisQueue(Resource clusterResource, - Set nodeLabels) { - Set labelCanAccess = - new HashSet( - accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels - : Sets.intersection(accessibleLabels, nodeLabels)); - if (nodeLabels.isEmpty()) { - // Any queue can always access any node without label - labelCanAccess.add(RMNodeLabelsManager.NO_LABEL); - } - - boolean canAssign = true; - for (String label : labelCanAccess) { - float currentAbsoluteLabelUsedCapacity = - Resources.divide(resourceCalculator, clusterResource, - queueUsage.getUsed(label), - labelManager.getResourceByLabel(label, clusterResource)); - // if any of the label doesn't beyond limit, we can allocate on this node - if (currentAbsoluteLabelUsedCapacity >= - queueCapacities.getAbsoluteMaximumCapacity(label)) { - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() + " used=" + queueUsage.getUsed() - + " current-capacity (" + queueUsage.getUsed(label) + ") " - + " >= max-capacity (" - + labelManager.getResourceByLabel(label, clusterResource) + ")"); - } - canAssign = false; - break; - } - } - - return canAssign; - } - - - private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) { - if (this.reservationsContinueLooking) { - // check to see if we could potentially use this node instead of a reserved - // node - - Resource reservedResources = Resources.createResource(getMetrics() - .getReservedMB(), getMetrics().getReservedVirtualCores()); - float capacityWithoutReservedCapacity = Resources.divide( - resourceCalculator, clusterResource, - Resources.subtract(queueUsage.getUsed(), reservedResources), - clusterResource); - - if (capacityWithoutReservedCapacity <= queueCapacities - .getAbsoluteMaximumCapacity()) { - if (LOG.isDebugEnabled()) { - LOG.debug("parent: try to use reserved: " + getQueueName() - + " usedResources: " + queueUsage.getUsed().getMemory() - + " clusterResources: " + clusterResource.getMemory() - + " reservedResources: " + reservedResources.getMemory() - + " currentCapacity " + ((float) queueUsage.getUsed().getMemory()) - / clusterResource.getMemory() - + " potentialNewWithoutReservedCapacity: " - + capacityWithoutReservedCapacity + " ( " + " max-capacity: " - + queueCapacities.getAbsoluteMaximumCapacity() + ")"); - } - // we could potentially use this node instead of reserved node - return true; - } - } - return false; - } - - private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { return (node.getReservedContainer() == null) && Resources.greaterThanOrEqual(resourceCalculator, clusterResource, @@ -534,28 +458,38 @@ private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { } private ResourceLimits getResourceLimitsOfChild(CSQueue child, - Resource clusterResource, ResourceLimits myLimits) { - /* - * Set head-room of a given child, limit = - * min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used - * + child.used. To avoid any of this queue's and its ancestors' limit - * being violated - */ - Resource myCurrentLimit = - getCurrentResourceLimit(clusterResource, myLimits); - // My available resource = my-current-limit - my-used-resource - Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit, - getUsedResources()); - // Child's limit = my-available-resource + resource-already-used-by-child + Resource clusterResource, ResourceLimits parentLimits) { + // Set resource-limit of a given child, child.limit = + // min(my.limit - my.used + child.used, child.max) + + // Parent available resource = parent-limit - parent-used-resource + Resource parentMaxAvailableResource = + Resources.subtract(parentLimits.getLimit(), getUsedResources()); + + // Child's limit = parent-available-resource + child-used Resource childLimit = - Resources.add(myMaxAvailableResource, child.getUsedResources()); - + Resources.add(parentMaxAvailableResource, child.getUsedResources()); + + // Get child's max resource + Resource childConfiguredMaxResource = + Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), + child.getAbsoluteMaximumCapacity(), minimumAllocation); + + // Child's limit should be capped by child configured max resource + childLimit = + Resources.min(resourceCalculator, clusterResource, childLimit, + childConfiguredMaxResource); + + // Normalize before return + childLimit = + Resources.roundDown(resourceCalculator, childLimit, minimumAllocation); + return new ResourceLimits(childLimit); } private synchronized CSAssignment assignContainersToChildQueues( - Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve, - ResourceLimits limits) { + Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); @@ -573,9 +507,7 @@ private synchronized CSAssignment assignContainersToChildQueues( ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, cluster, limits); - assignment = - childQueue.assignContainers(cluster, node, needToUnreserve, - childLimits); + assignment = childQueue.assignContainers(cluster, node, childLimits); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + " stats: " + childQueue + " --> " + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 9f97b137f2d..6cc27775862 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -274,7 +274,8 @@ public synchronized Allocation getAllocation(ResourceCalculator rc, } synchronized public NodeId getNodeIdToUnreserve(Priority priority, - Resource capability) { + Resource resourceNeedUnreserve, ResourceCalculator rc, + Resource clusterResource) { // first go around make this algorithm simple and just grab first // reservation that has enough resources @@ -283,16 +284,19 @@ synchronized public NodeId getNodeIdToUnreserve(Priority priority, if ((reservedContainers != null) && (!reservedContainers.isEmpty())) { for (Map.Entry entry : reservedContainers.entrySet()) { + NodeId nodeId = entry.getKey(); + Resource containerResource = entry.getValue().getContainer().getResource(); + // make sure we unreserve one with at least the same amount of // resources, otherwise could affect capacity limits - if (Resources.fitsIn(capability, entry.getValue().getContainer() - .getResource())) { + if (Resources.lessThanOrEqual(rc, clusterResource, + resourceNeedUnreserve, containerResource)) { if (LOG.isDebugEnabled()) { LOG.debug("unreserving node with reservation size: " - + entry.getValue().getContainer().getResource() - + " in order to allocate container with size: " + capability); + + containerResource + + " in order to allocate container with size: " + resourceNeedUnreserve); } - return entry.getKey(); + return nodeId; } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 8cad05725ba..1ca5c97a411 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -611,7 +611,7 @@ public void testHeadroom() throws Exception { app_0_0.updateResourceRequests(app_0_0_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + queue.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); Resource expectedHeadroom = Resources.createResource(10*16*GB, 1); assertEquals(expectedHeadroom, app_0_0.getHeadroom()); @@ -631,7 +631,7 @@ public void testHeadroom() throws Exception { app_0_1.updateResourceRequests(app_0_1_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + queue.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); // Schedule to compute assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change @@ -651,7 +651,7 @@ public void testHeadroom() throws Exception { app_1_0.updateResourceRequests(app_1_0_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + queue.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); // Schedule to compute expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); @@ -660,7 +660,7 @@ public void testHeadroom() throws Exception { // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); - queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + queue.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 83ab1046288..7a265dc2f40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -125,6 +125,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.ComparisonFailure; import org.junit.Test; import org.mockito.Mockito; @@ -2483,6 +2484,64 @@ public void testHierarchyQueuesCurrentLimits() throws Exception { Assert.assertEquals(30 * GB, am1.doHeartbeat().getAvailableResources().getMemory()); } + + @Test + public void testParentQueueMaxCapsAreRespected() throws Exception { + /* + * Queue tree: + * Root + * / \ + * A B + * / \ + * A1 A2 + */ + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + csConf.setCapacity(A, 50); + csConf.setMaximumCapacity(A, 50); + csConf.setCapacity(B, 50); + + // Define 2nd-level queues + csConf.setQueues(A, new String[] {"a1", "a2"}); + csConf.setCapacity(A1, 50); + csConf.setUserLimitFactor(A1, 100.0f); + csConf.setCapacity(A2, 50); + csConf.setUserLimitFactor(A2, 100.0f); + csConf.setCapacity(B1, B1_CAPACITY); + csConf.setUserLimitFactor(B1, 100.0f); + + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1); + + // Try to launch app2 in a2, asked 2GB, should success + RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "a2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + try { + // Try to allocate a container, a's usage=11G/max=12 + // a1's usage=9G/max=12 + // a2's usage=2G/max=12 + // In this case, if a2 asked 2G, should fail. + waitContainerAllocated(am2, 2 * GB, 1, 2, rm1, nm1); + } catch (AssertionError failure) { + // Expected, return; + return; + } + Assert.fail("Shouldn't successfully allocate containers for am2, " + + "queue-a's max capacity will be violated if container allocated"); + } private void setMaxAllocMb(Configuration conf, int maxAllocMb) { conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 7edb17d0e7e..71dc523c9dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -145,7 +144,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)). when(queue) - .assignContainers(eq(clusterResource), eq(node), anyBoolean(), + .assignContainers(eq(clusterResource), eq(node), any(ResourceLimits.class)); // Mock the node's resource availability @@ -157,7 +156,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { return new CSAssignment(allocatedResource, type); } }). - when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(), + when(queue).assignContainers(eq(clusterResource), eq(node), any(ResourceLimits.class)); doNothing().when(node).releaseContainer(any(Container.class)); } @@ -274,7 +273,7 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); for(int i=0; i < 2; i++) { @@ -282,7 +281,7 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); } for(int i=0; i < 3; i++) @@ -291,7 +290,7 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 1*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); } for(int i=0; i < 4; i++) @@ -300,7 +299,7 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); } verifyQueueMetrics(a, 1*GB, clusterResource); @@ -334,7 +333,7 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); } verifyQueueMetrics(a, 3*GB, clusterResource); @@ -362,7 +361,7 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 3*GB, clusterResource); @@ -389,7 +388,7 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -404,13 +403,13 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + root.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource)); InOrder allocationOrder = inOrder(d,b); allocationOrder.verify(d).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class)); + any(FiCaSchedulerNode.class), any(ResourceLimits.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class)); + any(FiCaSchedulerNode.class), any(ResourceLimits.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index a5a2e5fe0dd..972cabbf2cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -350,8 +350,8 @@ public void testSingleQueueOneUserMetrics() throws Exception { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0, false, - new ResourceLimits(clusterResource)); + a.assignContainers(clusterResource, node_0, new ResourceLimits( + clusterResource)); assertEquals( (int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB), a.getMetrics().getAvailableMB()); @@ -486,7 +486,7 @@ public void testSingleQueueWithOneUser() throws Exception { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); @@ -497,7 +497,7 @@ public void testSingleQueueWithOneUser() throws Exception { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -506,7 +506,7 @@ public void testSingleQueueWithOneUser() throws Exception { assertEquals(2*GB, a.getMetrics().getAllocatedMB()); // Can't allocate 3rd due to user-limit - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -516,7 +516,7 @@ public void testSingleQueueWithOneUser() throws Exception { // Bump up user-limit-factor, now allocate should work a.setUserLimitFactor(10); - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); @@ -525,7 +525,7 @@ public void testSingleQueueWithOneUser() throws Exception { assertEquals(3*GB, a.getMetrics().getAllocatedMB()); // One more should work, for app_1, due to user-limit-factor - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); @@ -536,8 +536,8 @@ public void testSingleQueueWithOneUser() throws Exception { // Test max-capacity // Now - no more allocs since we are at max-cap a.setMaxCapacity(0.5f); - a.assignContainers(clusterResource, node_0, false, - new ResourceLimits(clusterResource)); + a.assignContainers(clusterResource, node_0, new ResourceLimits( + clusterResource)); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -652,21 +652,21 @@ public void testUserLimits() throws Exception { // recordFactory))); // 1 container to user_0 - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Again one to user_0 since he hasn't exceeded user limit yet - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); // One more to user_0 since he is the only active user - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -718,7 +718,7 @@ public void testComputeUserLimitAndSetHeadroom(){ assertEquals("There should only be 1 active user!", 1, qb.getActiveUsersManager().getNumActiveUsers()); //get headroom - qb.assignContainers(clusterResource, node_0, false, + qb.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0 .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(), @@ -738,7 +738,7 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, u1Priority, recordFactory))); qb.submitApplicationAttempt(app_2, user_1); - qb.assignContainers(clusterResource, node_1, false, + qb.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0 .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(), @@ -781,9 +781,9 @@ public void testComputeUserLimitAndSetHeadroom(){ u1Priority, recordFactory))); qb.submitApplicationAttempt(app_1, user_0); qb.submitApplicationAttempt(app_3, user_1); - qb.assignContainers(clusterResource, node_0, false, + qb.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); - qb.assignContainers(clusterResource, node_0, false, + qb.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3 .getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(), @@ -802,7 +802,7 @@ public void testComputeUserLimitAndSetHeadroom(){ app_4.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true, u0Priority, recordFactory))); - qb.assignContainers(clusterResource, node_1, false, + qb.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4 .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(), @@ -875,7 +875,7 @@ public void testUserHeadroomMultiApp() throws Exception { TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory))); - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); @@ -892,7 +892,7 @@ public void testUserHeadroomMultiApp() throws Exception { TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, priority, recordFactory))); - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); @@ -981,7 +981,7 @@ public void testHeadroomWithMaxCap() throws Exception { 1, a.getActiveUsersManager().getNumActiveUsers()); // 1 container to user_0 - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -992,7 +992,7 @@ public void testHeadroomWithMaxCap() throws Exception { // the application is not yet active // Again one to user_0 since he hasn't exceeded user limit yet - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -1009,7 +1009,7 @@ public void testHeadroomWithMaxCap() throws Exception { // No more to user_0 since he is already over user-limit // and no more containers to queue since it's already at max-cap - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -1023,7 +1023,7 @@ public void testHeadroomWithMaxCap() throws Exception { TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true, priority, recordFactory))); assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap } @@ -1094,7 +1094,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { */ // Only 1 container - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); @@ -1102,7 +1102,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -1110,7 +1110,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Can't allocate 3rd due to user-limit a.setUserLimit(25); - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -1129,7 +1129,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Now allocations should goto app_2 since // user_0 is at limit inspite of high user-limit-factor a.setUserLimitFactor(10); - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -1139,7 +1139,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Now allocations should goto app_0 since // user_0 is at user-limit not above it - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); @@ -1150,7 +1150,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Test max-capacity // Now - no more allocs since we are at max-cap a.setMaxCapacity(0.5f); - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); @@ -1162,7 +1162,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Now, allocations should goto app_3 since it's under user-limit a.setMaxCapacity(1.0f); a.setUserLimitFactor(1); - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(7*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); @@ -1171,7 +1171,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { assertEquals(1*GB, app_3.getCurrentConsumption().getMemory()); // Now we should assign to app_3 again since user_2 is under user-limit - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); @@ -1271,7 +1271,7 @@ public void testReservation() throws Exception { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); @@ -1282,7 +1282,7 @@ public void testReservation() throws Exception { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -1291,7 +1291,7 @@ public void testReservation() throws Exception { assertEquals(2*GB, a.getMetrics().getAllocatedMB()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -1308,7 +1308,7 @@ public void testReservation() throws Exception { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); @@ -1325,7 +1325,7 @@ public void testReservation() throws Exception { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -1393,7 +1393,7 @@ public void testStolenReservedContainer() throws Exception { // Start testing... - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -1403,7 +1403,7 @@ public void testStolenReservedContainer() throws Exception { assertEquals(0*GB, a.getMetrics().getAvailableMB()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -1417,7 +1417,7 @@ public void testStolenReservedContainer() throws Exception { // We do not need locality delay here doReturn(-1).when(a).getNodeLocalityDelay(); - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(10*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -1434,7 +1434,7 @@ public void testStolenReservedContainer() throws Exception { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -1503,7 +1503,7 @@ public void testReservationExchange() throws Exception { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); @@ -1511,14 +1511,14 @@ public void testReservationExchange() throws Exception { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); @@ -1533,7 +1533,7 @@ public void testReservationExchange() throws Exception { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); @@ -1543,7 +1543,7 @@ public void testReservationExchange() throws Exception { assertEquals(1, app_1.getReReservations(priority)); // Re-reserve - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); @@ -1553,7 +1553,7 @@ public void testReservationExchange() throws Exception { assertEquals(2, app_1.getReReservations(priority)); // Try to schedule on node_1 now, should *move* the reservation - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(9*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); @@ -1571,7 +1571,7 @@ public void testReservationExchange() throws Exception { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - CSAssignment assignment = a.assignContainers(clusterResource, node_0, false, + CSAssignment assignment = a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -1643,7 +1643,7 @@ public void testLocalityScheduling() throws Exception { CSAssignment assignment = null; // Start with off switch, shouldn't allocate due to delay scheduling - assignment = a.assignContainers(clusterResource, node_2, false, + assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -1652,7 +1652,7 @@ public void testLocalityScheduling() throws Exception { assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling - assignment = a.assignContainers(clusterResource, node_2, false, + assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -1661,7 +1661,7 @@ public void testLocalityScheduling() throws Exception { assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling - assignment = a.assignContainers(clusterResource, node_2, false, + assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -1671,7 +1671,7 @@ public void testLocalityScheduling() throws Exception { // Another off switch, now we should allocate // since missedOpportunities=3 and reqdContainers=3 - assignment = a.assignContainers(clusterResource, node_2, false, + assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -1680,7 +1680,7 @@ public void testLocalityScheduling() throws Exception { assertEquals(NodeType.OFF_SWITCH, assignment.getType()); // NODE_LOCAL - node_0 - assignment = a.assignContainers(clusterResource, node_0, false, + assignment = a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -1689,7 +1689,7 @@ public void testLocalityScheduling() throws Exception { assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // NODE_LOCAL - node_1 - assignment = a.assignContainers(clusterResource, node_1, false, + assignment = a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -1718,14 +1718,14 @@ public void testLocalityScheduling() throws Exception { doReturn(1).when(a).getNodeLocalityDelay(); // Shouldn't assign RACK_LOCAL yet - assignment = a.assignContainers(clusterResource, node_3, false, + assignment = a.assignContainers(clusterResource, node_3, new ResourceLimits(clusterResource)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(2, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Should assign RACK_LOCAL now - assignment = a.assignContainers(clusterResource, node_3, false, + assignment = a.assignContainers(clusterResource, node_3, new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -1807,7 +1807,7 @@ public void testApplicationPriorityScheduling() throws Exception { // Start with off switch, shouldn't allocate P1 due to delay scheduling // thus, no P2 either! - a.assignContainers(clusterResource, node_2, false, + a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); @@ -1820,7 +1820,7 @@ public void testApplicationPriorityScheduling() throws Exception { // Another off-switch, shouldn't allocate P1 due to delay scheduling // thus, no P2 either! - a.assignContainers(clusterResource, node_2, false, + a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); @@ -1832,7 +1832,7 @@ public void testApplicationPriorityScheduling() throws Exception { assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Another off-switch, shouldn't allocate OFF_SWITCH P1 - a.assignContainers(clusterResource, node_2, false, + a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); @@ -1844,7 +1844,7 @@ public void testApplicationPriorityScheduling() throws Exception { assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, DATA_LOCAL for P1 - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), eq(priority_1), any(ResourceRequest.class), any(Container.class)); @@ -1856,7 +1856,7 @@ public void testApplicationPriorityScheduling() throws Exception { assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, OFF_SWITCH for P2 - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), eq(priority_1), any(ResourceRequest.class), any(Container.class)); @@ -1933,7 +1933,7 @@ public void testSchedulingConstraints() throws Exception { app_0.updateResourceRequests(app_0_requests_0); // NODE_LOCAL - node_0_1 - a.assignContainers(clusterResource, node_0_0, false, + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -1942,7 +1942,7 @@ public void testSchedulingConstraints() throws Exception { // No allocation on node_1_0 even though it's node/rack local since // required(ANY) == 0 - a.assignContainers(clusterResource, node_1_0, false, + a.assignContainers(clusterResource, node_1_0, new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -1959,7 +1959,7 @@ public void testSchedulingConstraints() throws Exception { // No allocation on node_0_1 even though it's node/rack local since // required(rack_1) == 0 - a.assignContainers(clusterResource, node_0_1, false, + a.assignContainers(clusterResource, node_0_1, new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -1967,7 +1967,7 @@ public void testSchedulingConstraints() throws Exception { assertEquals(1, app_0.getTotalRequiredResources(priority)); // NODE_LOCAL - node_1 - a.assignContainers(clusterResource, node_1_0, false, + a.assignContainers(clusterResource, node_1_0, new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -2220,7 +2220,7 @@ public void testLocalityConstraints() throws Exception { // node_0_1 // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false - a.assignContainers(clusterResource, node_0_1, false, + a.assignContainers(clusterResource, node_0_1, new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -2243,7 +2243,7 @@ public void testLocalityConstraints() throws Exception { // node_1_1 // Shouldn't allocate since RR(rack_1) = relax: false - a.assignContainers(clusterResource, node_1_1, false, + a.assignContainers(clusterResource, node_1_1, new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -2274,7 +2274,7 @@ public void testLocalityConstraints() throws Exception { // node_1_1 // Shouldn't allocate since node_1_1 is blacklisted - a.assignContainers(clusterResource, node_1_1, false, + a.assignContainers(clusterResource, node_1_1, new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -2303,7 +2303,7 @@ public void testLocalityConstraints() throws Exception { // node_1_1 // Shouldn't allocate since rack_1 is blacklisted - a.assignContainers(clusterResource, node_1_1, false, + a.assignContainers(clusterResource, node_1_1, new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -2330,7 +2330,7 @@ public void testLocalityConstraints() throws Exception { // Blacklist: < host_0_0 > <---- // Now, should allocate since RR(rack_1) = relax: true - a.assignContainers(clusterResource, node_1_1, false, + a.assignContainers(clusterResource, node_1_1, new ResourceLimits(clusterResource)); verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -2361,7 +2361,7 @@ public void testLocalityConstraints() throws Exception { // host_1_0: 8G // host_1_1: 7G - a.assignContainers(clusterResource, node_1_0, false, + a.assignContainers(clusterResource, node_1_0, new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); @@ -2444,7 +2444,7 @@ public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() recordFactory))); try { - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); } catch (NullPointerException e) { Assert.fail("NPE when allocating container on node but " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 4f8938607e7..7da1c97fec0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -156,7 +156,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)).when(queue) - .assignContainers(eq(clusterResource), eq(node), eq(false), + .assignContainers(eq(clusterResource), eq(node), any(ResourceLimits.class)); // Mock the node's resource availability @@ -167,8 +167,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { return new CSAssignment(allocatedResource, type); } - }). -when(queue).assignContainers(eq(clusterResource), eq(node), eq(false), + }).when(queue).assignContainers(eq(clusterResource), eq(node), any(ResourceLimits.class)); } @@ -232,7 +231,7 @@ public void testSingleLevelQueues() throws Exception { // Simulate B returning a container on node_0 stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0, false, + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource); @@ -240,13 +239,13 @@ public void testSingleLevelQueues() throws Exception { // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G stubQueueAllocation(a, clusterResource, node_1, 2*GB); stubQueueAllocation(b, clusterResource, node_1, 1*GB); - root.assignContainers(clusterResource, node_1, false, + root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -254,13 +253,13 @@ public void testSingleLevelQueues() throws Exception { // since A has 2/6G while B has 2/14G stubQueueAllocation(a, clusterResource, node_0, 1*GB); stubQueueAllocation(b, clusterResource, node_0, 2*GB); - root.assignContainers(clusterResource, node_0, false, + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -268,13 +267,13 @@ public void testSingleLevelQueues() throws Exception { // since A has 3/6G while B has 4/14G stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 4*GB); - root.assignContainers(clusterResource, node_0, false, + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); @@ -282,13 +281,13 @@ public void testSingleLevelQueues() throws Exception { // since A has 3/6G while B has 8/14G stubQueueAllocation(a, clusterResource, node_1, 1*GB); stubQueueAllocation(b, clusterResource, node_1, 1*GB); - root.assignContainers(clusterResource, node_1, false, + root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); allocationOrder = inOrder(a, b); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); verifyQueueMetrics(a, 4*GB, clusterResource); verifyQueueMetrics(b, 9*GB, clusterResource); } @@ -405,6 +404,22 @@ private void setupMultiLevelQueues(CapacitySchedulerConfiguration conf) { @Test public void testMultiLevelQueues() throws Exception { + /* + * Structure of queue: + * Root + * ____________ + * / | \ \ + * A B C D + * / | / | \ \ + * A1 A2 B1 B2 B3 C1 + * \ + * C11 + * \ + * C111 + * \ + * C1111 + */ + // Setup queue configs setupMultiLevelQueues(csConf); @@ -449,7 +464,7 @@ public void testMultiLevelQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 1*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false, + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 0*GB, clusterResource); @@ -462,7 +477,7 @@ public void testMultiLevelQueues() throws Exception { stubQueueAllocation(a, clusterResource, node_1, 0*GB); stubQueueAllocation(b2, clusterResource, node_1, 4*GB); stubQueueAllocation(c, clusterResource, node_1, 0*GB); - root.assignContainers(clusterResource, node_1, false, + root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -474,15 +489,15 @@ public void testMultiLevelQueues() throws Exception { stubQueueAllocation(a1, clusterResource, node_0, 1*GB); stubQueueAllocation(b3, clusterResource, node_0, 2*GB); stubQueueAllocation(c, clusterResource, node_0, 2*GB); - root.assignContainers(clusterResource, node_0, false, + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); InOrder allocationOrder = inOrder(a, c, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 6*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -501,17 +516,17 @@ public void testMultiLevelQueues() throws Exception { stubQueueAllocation(b3, clusterResource, node_2, 1*GB); stubQueueAllocation(b1, clusterResource, node_2, 1*GB); stubQueueAllocation(c, clusterResource, node_2, 1*GB); - root.assignContainers(clusterResource, node_2, false, + root.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource)); allocationOrder = inOrder(a, a2, a1, b, c); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); allocationOrder.verify(a2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); verifyQueueMetrics(c, 4*GB, clusterResource); @@ -611,7 +626,7 @@ public void testOffSwitchScheduling() throws Exception { // Simulate B returning a container on node_0 stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0, false, + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource); @@ -620,13 +635,13 @@ public void testOffSwitchScheduling() throws Exception { // also, B gets a scheduling opportunity since A allocates RACK_LOCAL stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL); stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_1, false, + root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -635,13 +650,13 @@ public void testOffSwitchScheduling() throws Exception { // However, since B returns off-switch, A won't get an opportunity stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0, false, + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -680,7 +695,7 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { // Simulate B3 returning a container on node_0 stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0, false, + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); verifyQueueMetrics(b2, 0*GB, clusterResource); verifyQueueMetrics(b3, 1*GB, clusterResource); @@ -689,13 +704,13 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { // also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL); stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_1, false, + root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); InOrder allocationOrder = inOrder(b2, b3); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 2*GB, clusterResource); @@ -704,13 +719,13 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { // However, since B3 returns off-switch, B2 won't get an opportunity stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0, false, + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); allocationOrder = inOrder(b3, b2); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); + any(FiCaSchedulerNode.class), anyResourceLimits()); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 3*GB, clusterResource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index b3250e5449a..c5b7587a399 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -265,7 +265,7 @@ public void testReservation() throws Exception { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); @@ -277,7 +277,7 @@ public void testReservation() throws Exception { assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); @@ -289,7 +289,7 @@ public void testReservation() throws Exception { assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); @@ -304,7 +304,7 @@ public void testReservation() throws Exception { assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); // try to assign reducer (5G on node 0 and should reserve) - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); @@ -320,7 +320,7 @@ public void testReservation() throws Exception { assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); // assign reducer to node 2 - a.assignContainers(clusterResource, node_2, false, + a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource)); assertEquals(18 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); @@ -337,7 +337,7 @@ public void testReservation() throws Exception { // node_1 heartbeat and unreserves from node_0 in order to allocate // on node_1 - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(18 * GB, a.getUsedResources().getMemory()); assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory()); @@ -421,7 +421,7 @@ public void testReservationNoContinueLook() throws Exception { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); @@ -433,7 +433,7 @@ public void testReservationNoContinueLook() throws Exception { assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); @@ -445,7 +445,7 @@ public void testReservationNoContinueLook() throws Exception { assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); @@ -460,7 +460,7 @@ public void testReservationNoContinueLook() throws Exception { assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); // try to assign reducer (5G on node 0 and should reserve) - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); @@ -476,7 +476,7 @@ public void testReservationNoContinueLook() throws Exception { assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); // assign reducer to node 2 - a.assignContainers(clusterResource, node_2, false, + a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource)); assertEquals(18 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); @@ -493,7 +493,7 @@ public void testReservationNoContinueLook() throws Exception { // node_1 heartbeat and won't unreserve from node_0, potentially stuck // if AM doesn't handle - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(18 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); @@ -569,7 +569,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); @@ -580,7 +580,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception { assertEquals(0 * GB, node_1.getUsedResource().getMemory()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); @@ -591,7 +591,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception { assertEquals(0 * GB, node_1.getUsedResource().getMemory()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); @@ -605,7 +605,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception { assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); // try to assign reducer (5G on node 0 and should reserve) - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); @@ -620,7 +620,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception { assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); // could allocate but told need to unreserve first - a.assignContainers(clusterResource, node_1, true, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); @@ -652,6 +652,8 @@ public void testGetAppToUnreserve() throws Exception { String host_1 = "host_1"; FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8 * GB); + + Resource clusterResource = Resources.createResource(2 * 8 * GB); // Setup resource-requests Priority priorityMap = TestUtils.createMockPriority(5); @@ -681,23 +683,28 @@ public void testGetAppToUnreserve() throws Exception { node_0.getNodeID(), "user", rmContext); // no reserved containers - NodeId unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + NodeId unreserveId = + app_0.getNodeIdToUnreserve(priorityMap, capability, + cs.getResourceCalculator(), clusterResource); assertEquals(null, unreserveId); // no reserved containers - reserve then unreserve app_0.reserve(node_0, priorityMap, rmContainer_1, container_1); app_0.unreserve(node_0, priorityMap); - unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability, + cs.getResourceCalculator(), clusterResource); assertEquals(null, unreserveId); // no container large enough is reserved app_0.reserve(node_0, priorityMap, rmContainer_1, container_1); - unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability, + cs.getResourceCalculator(), clusterResource); assertEquals(null, unreserveId); // reserve one that is now large enough app_0.reserve(node_1, priorityMap, rmContainer, container); - unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability, + cs.getResourceCalculator(), clusterResource); assertEquals(node_1.getNodeID(), unreserveId); } @@ -741,14 +748,14 @@ public void testFindNodeToUnreserve() throws Exception { // nothing reserved boolean res = a.findNodeToUnreserve(csContext.getClusterResource(), - node_1, app_0, priorityMap, capability); + node_1, app_0, priorityMap, capability, capability); assertFalse(res); // reserved but scheduler doesn't know about that node. app_0.reserve(node_1, priorityMap, rmContainer, container); node_1.reserveResource(app_0, priorityMap, rmContainer); res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0, - priorityMap, capability); + priorityMap, capability, capability); assertFalse(res); } @@ -815,7 +822,7 @@ public void testAssignToQueue() throws Exception { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); @@ -826,7 +833,7 @@ public void testAssignToQueue() throws Exception { assertEquals(0 * GB, node_1.getUsedResource().getMemory()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); @@ -837,7 +844,7 @@ public void testAssignToQueue() throws Exception { assertEquals(0 * GB, node_1.getUsedResource().getMemory()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); @@ -852,14 +859,15 @@ public void testAssignToQueue() throws Exception { // absoluteMaxCapacity Resource capability = Resources.createResource(32 * GB, 0); boolean res = - a.canAssignToThisQueue(clusterResource, capability, - CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true); + a.canAssignToThisQueue(clusterResource, + CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( + clusterResource), capability, Resources.none()); assertFalse(res); // now add in reservations and make sure it continues if config set // allocate to queue so that the potential new capacity is greater then // absoluteMaxCapacity - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); @@ -872,14 +880,17 @@ public void testAssignToQueue() throws Exception { capability = Resources.createResource(5 * GB, 0); res = - a.canAssignToThisQueue(clusterResource, capability, - CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true); + a.canAssignToThisQueue(clusterResource, + CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( + clusterResource), capability, Resources + .createResource(5 * GB)); assertTrue(res); // tell to not check reservations res = - a.canAssignToThisQueue(clusterResource, capability, - CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, false); + a.canAssignToThisQueue(clusterResource, + CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( + clusterResource), capability, Resources.none()); assertFalse(res); refreshQueuesTurnOffReservationsContLook(a, csConf); @@ -887,13 +898,16 @@ public void testAssignToQueue() throws Exception { // should return false no matter what checkReservations is passed // in since feature is off res = - a.canAssignToThisQueue(clusterResource, capability, - CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, false); + a.canAssignToThisQueue(clusterResource, + CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( + clusterResource), capability, Resources.none()); assertFalse(res); res = - a.canAssignToThisQueue(clusterResource, capability, - CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true); + a.canAssignToThisQueue(clusterResource, + CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( + clusterResource), capability, Resources + .createResource(5 * GB)); assertFalse(res); } @@ -984,16 +998,16 @@ public void testAssignToUser() throws Exception { app_0.updateResourceRequests(Collections.singletonList(TestUtils .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, priorityAM, recordFactory))); - app_0.updateResourceRequests(Collections.singletonList(TestUtils - .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, - priorityReduce, recordFactory))); app_0.updateResourceRequests(Collections.singletonList(TestUtils .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, priorityMap, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); @@ -1004,7 +1018,7 @@ public void testAssignToUser() throws Exception { assertEquals(0 * GB, node_1.getUsedResource().getMemory()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); @@ -1015,7 +1029,7 @@ public void testAssignToUser() throws Exception { assertEquals(0 * GB, node_1.getUsedResource().getMemory()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); @@ -1029,7 +1043,7 @@ public void testAssignToUser() throws Exception { // now add in reservations and make sure it continues if config set // allocate to queue so that the potential new capacity is greater then // absoluteMaxCapacity - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); @@ -1116,19 +1130,19 @@ public void testReservationsNoneAvailable() throws Exception { app_0.updateResourceRequests(Collections.singletonList(TestUtils .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, priorityAM, recordFactory))); - app_0.updateResourceRequests(Collections.singletonList(TestUtils - .createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true, - priorityReduce, recordFactory))); app_0.updateResourceRequests(Collections.singletonList(TestUtils .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, priorityMap, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true, + priorityReduce, recordFactory))); app_0.updateResourceRequests(Collections.singletonList(TestUtils .createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true, priorityLast, recordFactory))); // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); @@ -1140,7 +1154,7 @@ public void testReservationsNoneAvailable() throws Exception { assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); @@ -1152,7 +1166,7 @@ public void testReservationsNoneAvailable() throws Exception { assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, false, + a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); @@ -1164,38 +1178,41 @@ public void testReservationsNoneAvailable() throws Exception { assertEquals(3 * GB, node_1.getUsedResource().getMemory()); assertEquals(0 * GB, node_2.getUsedResource().getMemory()); - // try to assign reducer (5G on node 0), but tell it - // it has to unreserve. No room to allocate and shouldn't reserve - // since nothing currently reserved. - a.assignContainers(clusterResource, node_0, true, - new ResourceLimits(clusterResource)); + // try to assign reducer (5G on node 0), but tell it's resource limits < + // used (8G) + required (5G). It will not reserved since it has to unreserve + // some resource. Even with continous reservation looking, we don't allow + // unreserve resource to reserve container. + a.assignContainers(clusterResource, node_0, + new ResourceLimits(Resources.createResource(10 * GB))); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); assertEquals(16 * GB, a.getMetrics().getAvailableMB()); - assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + // app_0's headroom = limit (10G) - used (8G) = 2G + assertEquals(2 * GB, app_0.getHeadroom().getMemory()); assertEquals(5 * GB, node_0.getUsedResource().getMemory()); assertEquals(3 * GB, node_1.getUsedResource().getMemory()); assertEquals(0 * GB, node_2.getUsedResource().getMemory()); - // try to assign reducer (5G on node 2), but tell it - // it has to unreserve. Has room but shouldn't reserve - // since nothing currently reserved. - a.assignContainers(clusterResource, node_2, true, - new ResourceLimits(clusterResource)); + // try to assign reducer (5G on node 0), but tell it's resource limits < + // used (8G) + required (5G). It will not reserved since it has to unreserve + // some resource. Unfortunately, there's nothing to unreserve. + a.assignContainers(clusterResource, node_2, + new ResourceLimits(Resources.createResource(10 * GB))); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); assertEquals(16 * GB, a.getMetrics().getAvailableMB()); - assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + // app_0's headroom = limit (10G) - used (8G) = 2G + assertEquals(2 * GB, app_0.getHeadroom().getMemory()); assertEquals(5 * GB, node_0.getUsedResource().getMemory()); assertEquals(3 * GB, node_1.getUsedResource().getMemory()); assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // let it assign 5G to node_2 - a.assignContainers(clusterResource, node_2, false, + a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); @@ -1208,7 +1225,7 @@ public void testReservationsNoneAvailable() throws Exception { assertEquals(5 * GB, node_2.getUsedResource().getMemory()); // reserve 8G node_0 - a.assignContainers(clusterResource, node_0, false, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource)); assertEquals(21 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); @@ -1223,7 +1240,7 @@ public void testReservationsNoneAvailable() throws Exception { // try to assign (8G on node 2). No room to allocate, // continued to try due to having reservation above, // but hits queue limits so can't reserve anymore. - a.assignContainers(clusterResource, node_2, false, + a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource)); assertEquals(21 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());