From 84f7641f7eb1caadf28a36af9bd8e2f2e59cfe90 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Mon, 11 May 2015 15:13:39 -0700 Subject: [PATCH] YARN-3434. Interaction between reservations and userlimit can result in significant ULF violation. (Thomas Graves via wangda) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/ResourceLimits.java | 29 +++- .../scheduler/capacity/AbstractCSQueue.java | 89 ++++++----- .../scheduler/capacity/LeafQueue.java | 149 ++++++++---------- .../scheduler/capacity/TestReservations.java | 65 +++++--- 5 files changed, 177 insertions(+), 158 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 10b2a932737..08cfb0c0b55 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -74,6 +74,9 @@ Release 2.7.1 - UNRELEASED YARN-3476. Nodemanager can fail to delete local logs if log aggregation fails (Rohith via jlowe) + YARN-3434. Interaction between reservations and userlimit can result in + significant ULF violation. (Thomas Graves via wangda) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java index 12333e877b9..00107f1c608 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java @@ -19,22 +19,43 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; /** * Resource limits for queues/applications, this means max overall (please note * that, it's not "extra") resource you can get. */ public class ResourceLimits { + volatile Resource limit; + + // This is special limit that goes with the RESERVE_CONT_LOOK_ALL_NODES + // config. This limit indicates how much we need to unreserve to allocate + // another container. + private volatile Resource amountNeededUnreserve; + public ResourceLimits(Resource limit) { + this.amountNeededUnreserve = Resources.none(); this.limit = limit; } - - volatile Resource limit; + + public ResourceLimits(Resource limit, Resource amountNeededUnreserve) { + this.amountNeededUnreserve = amountNeededUnreserve; + this.limit = limit; + } + public Resource getLimit() { return limit; } - + + public Resource getAmountNeededUnreserve() { + return amountNeededUnreserve; + } + public void setLimit(Resource limit) { this.limit = limit; } -} + + public void setAmountNeededUnreserve(Resource amountNeededUnreserve) { + this.amountNeededUnreserve = amountNeededUnreserve; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 4e53060beb0..0c9bff95382 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -453,54 +453,53 @@ public abstract class AbstractCSQueue implements CSQueue { Resource currentLimitResource = getCurrentLimitResource(label, clusterResource, currentResourceLimits); - // if reservation continous looking enabled, check to see if could we - // potentially use this node instead of a reserved node if the application - // has reserved containers. - // TODO, now only consider reservation cases when the node has no label - if (this.reservationsContinueLooking - && label.equals(RMNodeLabelsManager.NO_LABEL) - && Resources.greaterThan(resourceCalculator, clusterResource, - resourceCouldBeUnreserved, Resources.none())) { - // resource-without-reserved = used - reserved - Resource newTotalWithoutReservedResource = - Resources.subtract(newTotalResource, resourceCouldBeUnreserved); - - // when total-used-without-reserved-resource < currentLimit, we still - // have chance to allocate on this node by unreserving some containers - if (Resources.lessThan(resourceCalculator, clusterResource, - newTotalWithoutReservedResource, currentLimitResource)) { - if (LOG.isDebugEnabled()) { - LOG.debug("try to use reserved: " + getQueueName() - + " usedResources: " + queueUsage.getUsed() - + ", clusterResources: " + clusterResource - + ", reservedResources: " + resourceCouldBeUnreserved - + ", capacity-without-reserved: " - + newTotalWithoutReservedResource + ", maxLimitCapacity: " - + currentLimitResource); - } - return true; - } - } - - // Otherwise, if any of the label of this node beyond queue limit, we - // cannot allocate on this node. Consider a small epsilon here. if (Resources.greaterThan(resourceCalculator, clusterResource, newTotalResource, currentLimitResource)) { - return false; - } - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() - + "Check assign to queue, label=" + label - + " usedResources: " + queueUsage.getUsed(label) - + " clusterResources: " + clusterResource - + " currentUsedCapacity " - + Resources.divide(resourceCalculator, clusterResource, - queueUsage.getUsed(label), - labelManager.getResourceByLabel(label, clusterResource)) - + " max-capacity: " - + queueCapacities.getAbsoluteMaximumCapacity(label) - + ")"); + // if reservation continous looking enabled, check to see if could we + // potentially use this node instead of a reserved node if the application + // has reserved containers. + // TODO, now only consider reservation cases when the node has no label + if (this.reservationsContinueLooking + && label.equals(RMNodeLabelsManager.NO_LABEL) + && Resources.greaterThan(resourceCalculator, clusterResource, + resourceCouldBeUnreserved, Resources.none())) { + // resource-without-reserved = used - reserved + Resource newTotalWithoutReservedResource = + Resources.subtract(newTotalResource, resourceCouldBeUnreserved); + + // when total-used-without-reserved-resource < currentLimit, we still + // have chance to allocate on this node by unreserving some containers + if (Resources.lessThan(resourceCalculator, clusterResource, + newTotalWithoutReservedResource, currentLimitResource)) { + if (LOG.isDebugEnabled()) { + LOG.debug("try to use reserved: " + getQueueName() + + " usedResources: " + queueUsage.getUsed() + + ", clusterResources: " + clusterResource + + ", reservedResources: " + resourceCouldBeUnreserved + + ", capacity-without-reserved: " + + newTotalWithoutReservedResource + ", maxLimitCapacity: " + + currentLimitResource); + } + currentResourceLimits.setAmountNeededUnreserve(Resources.subtract(newTotalResource, + currentLimitResource)); + return true; + } + } + if (LOG.isDebugEnabled()) { + LOG.debug(getQueueName() + + "Check assign to queue, label=" + label + + " usedResources: " + queueUsage.getUsed(label) + + " clusterResources: " + clusterResource + + " currentUsedCapacity " + + Resources.divide(resourceCalculator, clusterResource, + queueUsage.getUsed(label), + labelManager.getResourceByLabel(label, clusterResource)) + + " max-capacity: " + + queueCapacities.getAbsoluteMaximumCapacity(label) + + ")"); + } + return false; } return true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index fa0e280db47..65a74724357 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -119,7 +119,7 @@ public class LeafQueue extends AbstractCSQueue { private final QueueResourceLimitsInfo queueResourceLimitsInfo = new QueueResourceLimitsInfo(); - private volatile ResourceLimits currentResourceLimits = null; + private volatile ResourceLimits cachedResourceLimitsForHeadroom = null; public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -149,7 +149,7 @@ public class LeafQueue extends AbstractCSQueue { this.lastClusterResource = clusterResource; updateAbsoluteCapacityResource(clusterResource); - this.currentResourceLimits = new ResourceLimits(clusterResource); + this.cachedResourceLimitsForHeadroom = new ResourceLimits(clusterResource); // Initialize headroom info, also used for calculating application // master resource limits. Since this happens during queue initialization @@ -820,13 +820,13 @@ public class LeafQueue extends AbstractCSQueue { // Check queue max-capacity limit if (!super.canAssignToThisQueue(clusterResource, node.getLabels(), - this.currentResourceLimits, required, application.getCurrentReservation())) { + currentResourceLimits, required, application.getCurrentReservation())) { return NULL_ASSIGNMENT; } // Check user limit if (!assignToUser(clusterResource, application.getUser(), userLimit, - application, true, requestedNodeLabels)) { + application, requestedNodeLabels, currentResourceLimits)) { break; } @@ -836,7 +836,7 @@ public class LeafQueue extends AbstractCSQueue { // Try to schedule CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, - null); + null, currentResourceLimits); // Did the application skip this node? if (assignment.getSkipped()) { @@ -897,7 +897,7 @@ public class LeafQueue extends AbstractCSQueue { // Try to assign if we have sufficient resources assignContainersOnNode(clusterResource, node, application, priority, - rmContainer); + rmContainer, new ResourceLimits(Resources.none())); // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* @@ -943,7 +943,7 @@ public class LeafQueue extends AbstractCSQueue { private void setQueueResourceLimitsInfo( Resource clusterResource) { synchronized (queueResourceLimitsInfo) { - queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits + queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom .getLimit()); queueResourceLimitsInfo.setClusterResource(clusterResource); } @@ -964,13 +964,13 @@ public class LeafQueue extends AbstractCSQueue { setQueueResourceLimitsInfo(clusterResource); Resource headroom = - getHeadroom(queueUser, currentResourceLimits.getLimit(), + getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), clusterResource, userLimit); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + - " queueMaxAvailRes=" + currentResourceLimits.getLimit() + + " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() + " consumed=" + queueUser.getUsed() + " headroom=" + headroom); } @@ -1078,7 +1078,7 @@ public class LeafQueue extends AbstractCSQueue { @Private protected synchronized boolean assignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, - boolean checkReservations, Set requestLabels) { + Set requestLabels, ResourceLimits currentResoureLimits) { User user = getUser(userName); String label = CommonNodeLabelsManager.NO_LABEL; @@ -1094,12 +1094,12 @@ public class LeafQueue extends AbstractCSQueue { limit)) { // if enabled, check to see if could we potentially use this node instead // of a reserved node if the application has reserved containers - if (this.reservationsContinueLooking && checkReservations) { + if (this.reservationsContinueLooking) { if (Resources.lessThanOrEqual( resourceCalculator, clusterResource, - Resources.subtract(user.getUsed(), - application.getCurrentReservation()), limit)) { + Resources.subtract(user.getUsed(), application.getCurrentReservation()), + limit)) { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() @@ -1107,6 +1107,13 @@ public class LeafQueue extends AbstractCSQueue { + user.getUsed() + " reserved: " + application.getCurrentReservation() + " limit: " + limit); } + Resource amountNeededToUnreserve = Resources.subtract(user.getUsed(label), limit); + // we can only acquire a new container if we unreserve first since we ignored the + // user limit. Choose the max of user limit or what was previously set by max + // capacity. + currentResoureLimits.setAmountNeededUnreserve(Resources.max(resourceCalculator, + clusterResource, currentResoureLimits.getAmountNeededUnreserve(), + amountNeededToUnreserve)); return true; } } @@ -1153,7 +1160,7 @@ public class LeafQueue extends AbstractCSQueue { private CSAssignment assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer) { + RMContainer reservedContainer, ResourceLimits currentResoureLimits) { Resource assigned = Resources.none(); NodeType requestType = null; @@ -1166,7 +1173,7 @@ public class LeafQueue extends AbstractCSQueue { assigned = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, node, application, priority, reservedContainer, - allocatedContainer); + allocatedContainer, currentResoureLimits); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1194,7 +1201,7 @@ public class LeafQueue extends AbstractCSQueue { assigned = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, node, application, priority, reservedContainer, - allocatedContainer); + allocatedContainer, currentResoureLimits); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1222,7 +1229,7 @@ public class LeafQueue extends AbstractCSQueue { assigned = assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, node, application, priority, reservedContainer, - allocatedContainer); + allocatedContainer, currentResoureLimits); // update locality statistics if (allocatedContainer.getValue() != null) { @@ -1233,20 +1240,11 @@ public class LeafQueue extends AbstractCSQueue { return SKIP_ASSIGNMENT; } - - private Resource getMinimumResourceNeedUnreserved(Resource askedResource) { - // First we need to get minimum resource we need unreserve - // minimum-resource-need-unreserve = used + asked - limit - Resource minimumUnreservedResource = - Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource), - currentResourceLimits.getLimit()); - return minimumUnreservedResource; - } @Private protected boolean findNodeToUnreserve(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - Resource askedResource, Resource minimumUnreservedResource) { + Resource minimumUnreservedResource) { // need to unreserve some other container first NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, minimumUnreservedResource, @@ -1267,7 +1265,7 @@ public class LeafQueue extends AbstractCSQueue { LOG.debug("unreserving for app: " + application.getApplicationId() + " on nodeId: " + idToUnreserve + " in order to replace reserved application and place it on node: " - + node.getNodeID() + " needing: " + askedResource); + + node.getNodeID() + " needing: " + minimumUnreservedResource); } // headroom @@ -1286,45 +1284,16 @@ public class LeafQueue extends AbstractCSQueue { return true; } - @Private - protected boolean checkLimitsToReserve(Resource clusterResource, - FiCaSchedulerApp application, Resource capability) { - // we can't reserve if we got here based on the limit - // checks assuming we could unreserve!!! - Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, capability, null); - - // Check queue max-capacity limit, - // TODO: Consider reservation on labels - if (!canAssignToThisQueue(clusterResource, null, - this.currentResourceLimits, capability, Resources.none())) { - if (LOG.isDebugEnabled()) { - LOG.debug("was going to reserve but hit queue limit"); - } - return false; - } - - // Check user limit - if (!assignToUser(clusterResource, application.getUser(), userLimit, - application, false, null)) { - if (LOG.isDebugEnabled()) { - LOG.debug("was going to reserve but hit user limit"); - } - return false; - } - return true; - } - - private Resource assignNodeLocalContainers(Resource clusterResource, ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer, + ResourceLimits currentResoureLimits) { if (canAssign(application, priority, node, NodeType.NODE_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, - allocatedContainer); + allocatedContainer, currentResoureLimits); } return Resources.none(); @@ -1333,12 +1302,13 @@ public class LeafQueue extends AbstractCSQueue { private Resource assignRackLocalContainers(Resource clusterResource, ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer, + ResourceLimits currentResoureLimits) { if (canAssign(application, priority, node, NodeType.RACK_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, - allocatedContainer); + allocatedContainer, currentResoureLimits); } return Resources.none(); @@ -1347,12 +1317,13 @@ public class LeafQueue extends AbstractCSQueue { private Resource assignOffSwitchContainers(Resource clusterResource, ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer) { + RMContainer reservedContainer, MutableObject allocatedContainer, + ResourceLimits currentResoureLimits) { if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, - allocatedContainer); + allocatedContainer, currentResoureLimits); } return Resources.none(); @@ -1436,7 +1407,7 @@ public class LeafQueue extends AbstractCSQueue { private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer, - MutableObject createdContainer) { + MutableObject createdContainer, ResourceLimits currentResoureLimits) { if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + application.getApplicationId() @@ -1488,6 +1459,10 @@ public class LeafQueue extends AbstractCSQueue { // Can we allocate a container on this node? int availableContainers = resourceCalculator.computeAvailableContainers(available, capability); + + boolean needToUnreserve = Resources.greaterThan(resourceCalculator,clusterResource, + currentResoureLimits.getAmountNeededUnreserve(), Resources.none()); + if (availableContainers > 0) { // Allocate... @@ -1496,20 +1471,24 @@ public class LeafQueue extends AbstractCSQueue { unreserve(application, priority, node, rmContainer); } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) { // when reservationsContinueLooking is set, we may need to unreserve - // some containers to meet this queue and its parents' resource limits + // some containers to meet this queue, its parents', or the users' resource limits. // TODO, need change here when we want to support continuous reservation // looking for labeled partitions. - Resource minimumUnreservedResource = - getMinimumResourceNeedUnreserved(capability); - if (!shouldAllocOrReserveNewContainer - || Resources.greaterThan(resourceCalculator, clusterResource, - minimumUnreservedResource, Resources.none())) { + if (!shouldAllocOrReserveNewContainer || needToUnreserve) { + // If we shouldn't allocate/reserve new container then we should unreserve one the same + // size we are asking for since the currentResoureLimits.getAmountNeededUnreserve + // could be zero. If the limit was hit then use the amount we need to unreserve to be + // under the limit. + Resource amountToUnreserve = capability; + if (needToUnreserve) { + amountToUnreserve = currentResoureLimits.getAmountNeededUnreserve(); + } boolean containerUnreserved = findNodeToUnreserve(clusterResource, node, application, priority, - capability, minimumUnreservedResource); - // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved + amountToUnreserve); + // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved // container (That means we *have to* unreserve some resource to - // continue)). If we failed to unreserve some resource, + // continue)). If we failed to unreserve some resource, we can't continue. if (!containerUnreserved) { return Resources.none(); } @@ -1541,13 +1520,13 @@ public class LeafQueue extends AbstractCSQueue { if (shouldAllocOrReserveNewContainer || rmContainer != null) { if (reservationsContinueLooking && rmContainer == null) { - // we could possibly ignoring parent queue capacity limits when - // reservationsContinueLooking is set. - // If we're trying to reserve a container here, not container will be - // unreserved for reserving the new one. Check limits again before - // reserve the new container - if (!checkLimitsToReserve(clusterResource, - application, capability)) { + // we could possibly ignoring queue capacity or user limits when + // reservationsContinueLooking is set. Make sure we didn't need to unreserve + // one. + if (needToUnreserve) { + if (LOG.isDebugEnabled()) { + LOG.debug("we needed to unreserve to be able to allocate"); + } return Resources.none(); } } @@ -1679,8 +1658,8 @@ public class LeafQueue extends AbstractCSQueue { user.releaseContainer(resource, nodeLabels); metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); - LOG.info(getQueueName() + - " used=" + queueUsage.getUsed() + " numContainers=" + numContainers + + LOG.info(getQueueName() + + " used=" + queueUsage.getUsed() + " numContainers=" + numContainers + " user=" + userName + " user-resources=" + user.getUsed()); } @@ -1697,14 +1676,14 @@ public class LeafQueue extends AbstractCSQueue { // Even if ParentQueue will set limits respect child's max queue capacity, // but when allocating reserved container, CapacityScheduler doesn't do // this. So need cap limits by queue's max capacity here. - this.currentResourceLimits = currentResourceLimits; + this.cachedResourceLimitsForHeadroom = new ResourceLimits(currentResourceLimits.getLimit()); Resource queueMaxResource = Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), queueCapacities .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL), minimumAllocation); - this.currentResourceLimits.setLimit(Resources.min(resourceCalculator, + this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(resourceCalculator, clusterResource, queueMaxResource, currentResourceLimits.getLimit())); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index c5b7587a399..826a5f22b70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -748,14 +748,14 @@ public class TestReservations { // nothing reserved boolean res = a.findNodeToUnreserve(csContext.getClusterResource(), - node_1, app_0, priorityMap, capability, capability); + node_1, app_0, priorityMap, capability); assertFalse(res); // reserved but scheduler doesn't know about that node. app_0.reserve(node_1, priorityMap, rmContainer, container); node_1.reserveResource(app_0, priorityMap, rmContainer); res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0, - priorityMap, capability, capability); + priorityMap, capability); assertFalse(res); } @@ -858,11 +858,13 @@ public class TestReservations { // allocate to queue so that the potential new capacity is greater then // absoluteMaxCapacity Resource capability = Resources.createResource(32 * GB, 0); + ResourceLimits limits = new ResourceLimits(clusterResource); boolean res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources.none()); + CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources.none()); assertFalse(res); + assertEquals(limits.getAmountNeededUnreserve(), Resources.none()); + // now add in reservations and make sure it continues if config set // allocate to queue so that the potential new capacity is greater then @@ -879,36 +881,42 @@ public class TestReservations { assertEquals(3 * GB, node_1.getUsedResource().getMemory()); capability = Resources.createResource(5 * GB, 0); + limits = new ResourceLimits(clusterResource); res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources + CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources .createResource(5 * GB)); assertTrue(res); + // 16GB total, 13GB consumed (8 allocated, 5 reserved). asking for 5GB so we would have to + // unreserve 2GB to get the total 5GB needed. + // also note vcore checks not enabled + assertEquals(Resources.createResource(2 * GB, 3), limits.getAmountNeededUnreserve()); // tell to not check reservations + limits = new ResourceLimits(clusterResource); res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources.none()); + CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources.none()); assertFalse(res); + assertEquals(Resources.none(), limits.getAmountNeededUnreserve()); + refreshQueuesTurnOffReservationsContLook(a, csConf); - // should return false no matter what checkReservations is passed - // in since feature is off + // should return false since reservations continue look is off. + limits = new ResourceLimits(clusterResource); res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources.none()); + CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources.none()); assertFalse(res); - + assertEquals(limits.getAmountNeededUnreserve(), Resources.none()); + limits = new ResourceLimits(clusterResource); res = a.canAssignToThisQueue(clusterResource, - CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits( - clusterResource), capability, Resources + CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources .createResource(5 * GB)); assertFalse(res); + assertEquals(Resources.none(), limits.getAmountNeededUnreserve()); } public void refreshQueuesTurnOffReservationsContLook(LeafQueue a, @@ -1056,22 +1064,31 @@ public class TestReservations { assertEquals(5 * GB, node_0.getUsedResource().getMemory()); assertEquals(3 * GB, node_1.getUsedResource().getMemory()); - // set limit so subtrace reservations it can continue - Resource limit = Resources.createResource(12 * GB, 0); - boolean res = a.assignToUser(clusterResource, user_0, limit, app_0, - true, null); + // not over the limit + Resource limit = Resources.createResource(14 * GB, 0); + ResourceLimits userResourceLimits = new ResourceLimits(clusterResource); + boolean res = a.assignToUser(clusterResource, user_0, limit, app_0, null, userResourceLimits); assertTrue(res); + assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve()); - // tell it not to check for reservations and should fail as already over - // limit - res = a.assignToUser(clusterResource, user_0, limit, app_0, false, null); - assertFalse(res); + // set limit so it subtracts reservations and it can continue + limit = Resources.createResource(12 * GB, 0); + userResourceLimits = new ResourceLimits(clusterResource); + res = a.assignToUser(clusterResource, user_0, limit, app_0, null, userResourceLimits); + assertTrue(res); + // limit set to 12GB, we are using 13GB (8 allocated, 5 reserved), to get under limit + // we need to unreserve 1GB + // also note vcore checks not enabled + assertEquals(Resources.createResource(1 * GB, 4), + userResourceLimits.getAmountNeededUnreserve()); refreshQueuesTurnOffReservationsContLook(a, csConf); + userResourceLimits = new ResourceLimits(clusterResource); // should now return false since feature off - res = a.assignToUser(clusterResource, user_0, limit, app_0, true, null); + res = a.assignToUser(clusterResource, user_0, limit, app_0, null, userResourceLimits); assertFalse(res); + assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve()); } @Test