diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index fe280c49bf1..73bcaf0b4bc 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -641,6 +641,9 @@ Release 2.7.0 - UNRELEASED YARN-3270. Fix node label expression not getting set in ApplicationSubmissionContext (Rohit Agarwal via wangda) + YARN-3265. Fixed a deadlock in CapacityScheduler by always passing a queue's + available resource-limit from the parent queue. (Wangda Tan via vinodkv) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java new file mode 100644 index 00000000000..12333e877b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * Resource limits for queues/applications, this means max overall (please note + * that, it's not "extra") resource you can get. + */ +public class ResourceLimits { + public ResourceLimits(Resource limit) { + this.limit = limit; + } + + volatile Resource limit; + public Resource getLimit() { + return limit; + } + + public void setLimit(Resource limit) { + this.limit = limit; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index c651878a996..de44bbe4976 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -50,11 +50,12 @@ public ResourceUsage() { writeLock = lock.writeLock(); usages = new HashMap(); + usages.put(NL, new UsageByLabel(NL)); } // Usage enum here to make implement cleaner private enum ResourceType { - USED(0), PENDING(1), AMUSED(2), RESERVED(3), HEADROOM(4); + USED(0), PENDING(1), AMUSED(2), RESERVED(3); private int idx; @@ -71,7 +72,18 @@ public UsageByLabel(String label) { resArr = new Resource[ResourceType.values().length]; for (int i = 0; i < resArr.length; i++) { resArr[i] = Resource.newInstance(0, 0); - } + }; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{used=" + resArr[0] + "%, "); + sb.append("pending=" + resArr[1] + "%, "); + sb.append("am_used=" + resArr[2] + "%, "); + sb.append("reserved=" + resArr[3] + "%, "); + sb.append("headroom=" + resArr[4] + "%}"); + return sb.toString(); } } @@ -180,41 +192,6 @@ public void setReserved(String label, Resource res) { _set(label, ResourceType.RESERVED, res); } - /* - * Headroom - */ - public Resource getHeadroom() { - return getHeadroom(NL); - } - - public Resource getHeadroom(String label) { - return _get(label, ResourceType.HEADROOM); - } - - public void incHeadroom(String label, Resource res) { - _inc(label, ResourceType.HEADROOM, res); - } - - public void incHeadroom(Resource res) { - incHeadroom(NL, res); - } - - public void decHeadroom(Resource res) { - decHeadroom(NL, res); - } - - public void decHeadroom(String label, Resource res) { - _dec(label, ResourceType.HEADROOM, res); - } - - public void setHeadroom(Resource res) { - setHeadroom(NL, res); - } - - public void setHeadroom(String label, Resource res) { - _set(label, ResourceType.HEADROOM, res); - } - /* * AM-Used */ @@ -309,4 +286,14 @@ private void _dec(String label, ResourceType type, Resource res) { writeLock.unlock(); } } + + @Override + public String toString() { + try { + readLock.lock(); + return usages.toString(); + } finally { + readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index eb7218b6ec0..d8007097a20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -40,9 +40,11 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.Sets; @@ -52,7 +54,7 @@ public abstract class AbstractCSQueue implements CSQueue { final String queueName; volatile int numContainers; - Resource minimumAllocation; + final Resource minimumAllocation; Resource maximumAllocation; QueueState state; final QueueMetrics metrics; @@ -94,6 +96,7 @@ public AbstractCSQueue(CapacitySchedulerContext cs, cs.getConf()); this.csContext = cs; + this.minimumAllocation = csContext.getMinimumResourceCapability(); // initialize ResourceUsage queueUsage = new ResourceUsage(); @@ -248,7 +251,6 @@ synchronized void setupQueueConfigs(Resource clusterResource) // After we setup labels, we can setup capacities setupConfigurableCapacities(); - this.minimumAllocation = csContext.getMinimumResourceCapability(); this.maximumAllocation = csContext.getConfiguration().getMaximumAllocationPerQueue( getQueuePath()); @@ -403,4 +405,22 @@ private boolean isQueueHierarchyPreemptionDisabled(CSQueue q) { return csConf.getPreemptionDisabled(q.getQueuePath(), parentQ.getPreemptionDisabled()); } + + protected Resource getCurrentResourceLimit(Resource clusterResource, + ResourceLimits currentResourceLimits) { + /* + * Queue's max available resource = min(my.max, my.limit) + * my.limit is set by my parent, considered used resource of my siblings + */ + Resource queueMaxResource = + Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource, + queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation); + Resource queueCurrentResourceLimit = + Resources.min(resourceCalculator, clusterResource, queueMaxResource, + currentResourceLimits.getLimit()); + queueCurrentResourceLimit = + Resources.roundDown(resourceCalculator, queueCurrentResourceLimit, + minimumAllocation); + return queueCurrentResourceLimit; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 5cf38c19ae3..0a60acc9fdc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -189,10 +190,12 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, * @param clusterResource the resource of the cluster. * @param node node on which resources are available * @param needToUnreserve assign container only if it can unreserve one first + * @param resourceLimits how much overall resource of this queue can use. * @return the assignment */ - public CSAssignment assignContainers( - Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve); + public CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, boolean needToUnreserve, + ResourceLimits resourceLimits); /** * A container assigned to the queue has completed. @@ -231,8 +234,10 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) /** * Update the cluster resource for queues as we add/remove nodes * @param clusterResource the current cluster resource + * @param resourceLimits the current ResourceLimits */ - public void updateClusterResource(Resource clusterResource); + public void updateClusterResource(Resource clusterResource, + ResourceLimits resourceLimits); /** * Get the {@link ActiveUsersManager} for the queue. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 865b0b41979..1921195c220 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -225,52 +225,4 @@ public static void updateQueueStatistics( ) ); } - - public static float getAbsoluteMaxAvailCapacity( - ResourceCalculator resourceCalculator, Resource clusterResource, CSQueue queue) { - CSQueue parent = queue.getParent(); - if (parent == null) { - return queue.getAbsoluteMaximumCapacity(); - } - - //Get my parent's max avail, needed to determine my own - float parentMaxAvail = getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, parent); - //...and as a resource - Resource parentResource = Resources.multiply(clusterResource, parentMaxAvail); - - //check for no resources parent before dividing, if so, max avail is none - if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) { - return 0.0f; - } - //sibling used is parent used - my used... - float siblingUsedCapacity = Resources.ratio( - resourceCalculator, - Resources.subtract(parent.getUsedResources(), queue.getUsedResources()), - parentResource); - //my max avail is the lesser of my max capacity and what is unused from my parent - //by my siblings (if they are beyond their base capacity) - float maxAvail = Math.min( - queue.getMaximumCapacity(), - 1.0f - siblingUsedCapacity); - //and, mutiply by parent to get absolute (cluster relative) value - float absoluteMaxAvail = maxAvail * parentMaxAvail; - - if (LOG.isDebugEnabled()) { - LOG.debug("qpath " + queue.getQueuePath()); - LOG.debug("parentMaxAvail " + parentMaxAvail); - LOG.debug("siblingUsedCapacity " + siblingUsedCapacity); - LOG.debug("getAbsoluteMaximumCapacity " + queue.getAbsoluteMaximumCapacity()); - LOG.debug("maxAvail " + maxAvail); - LOG.debug("absoluteMaxAvail " + absoluteMaxAvail); - } - - if (absoluteMaxAvail < 0.0f) { - absoluteMaxAvail = 0.0f; - } else if (absoluteMaxAvail > 1.0f) { - absoluteMaxAvail = 1.0f; - } - - return absoluteMaxAvail; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java index f79d19507eb..c6524c6fd5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java @@ -26,32 +26,32 @@ public class CapacityHeadroomProvider { LeafQueue queue; FiCaSchedulerApp application; Resource required; - LeafQueue.QueueHeadroomInfo queueHeadroomInfo; + LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo; public CapacityHeadroomProvider( LeafQueue.User user, LeafQueue queue, FiCaSchedulerApp application, Resource required, - LeafQueue.QueueHeadroomInfo queueHeadroomInfo) { + LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) { this.user = user; this.queue = queue; this.application = application; this.required = required; - this.queueHeadroomInfo = queueHeadroomInfo; + this.queueResourceLimitsInfo = queueResourceLimitsInfo; } public Resource getHeadroom() { - Resource queueMaxCap; + Resource queueCurrentLimit; Resource clusterResource; - synchronized (queueHeadroomInfo) { - queueMaxCap = queueHeadroomInfo.getQueueMaxCap(); - clusterResource = queueHeadroomInfo.getClusterResource(); + synchronized (queueResourceLimitsInfo) { + queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); + clusterResource = queueResourceLimitsInfo.getClusterResource(); } - Resource headroom = queue.getHeadroom(user, queueMaxCap, + Resource headroom = queue.getHeadroom(user, queueCurrentLimit, clusterResource, application, required); // Corner case to deal with applications being slightly over-limit diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6b9d8460b23..28ce26448e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -25,6 +25,7 @@ import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -33,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.HashSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -84,12 +85,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -112,11 +117,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; - @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") @@ -499,7 +499,8 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) initializeQueueMappings(); // Re-calculate headroom for active applications - root.updateClusterResource(clusterResource); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); labelManager.reinitializeQueueLabels(getQueueToLabels()); setQueueAcls(authorizer, queues); @@ -990,7 +991,8 @@ private synchronized void nodeUpdate(RMNode nm) { private synchronized void updateNodeAndQueueResource(RMNode nm, ResourceOption resourceOption) { updateNodeResource(nm, resourceOption); - root.updateClusterResource(clusterResource); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); } /** @@ -1060,7 +1062,8 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); CSAssignment assignment = queue.assignContainers(clusterResource, node, - false); + false, new ResourceLimits( + clusterResource)); RMContainer excessReservation = assignment.getExcessReservation(); if (excessReservation != null) { @@ -1084,7 +1087,8 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getAvailableResource()); } - root.assignContainers(clusterResource, node, false); + root.assignContainers(clusterResource, node, false, new ResourceLimits( + clusterResource)); } } else { LOG.info("Skipping scheduling since node " + node.getNodeID() + @@ -1205,7 +1209,8 @@ private synchronized void addNode(RMNode nodeManager) { usePortForNodeName, nodeManager.getNodeLabels()); this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); - root.updateClusterResource(clusterResource); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); int numNodes = numNodeManagers.incrementAndGet(); updateMaximumAllocation(schedulerNode, true); @@ -1234,7 +1239,8 @@ private synchronized void removeNode(RMNode nodeInfo) { return; } Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); - root.updateClusterResource(clusterResource); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); int numNodes = numNodeManagers.decrementAndGet(); if (scheduleAsynchronously && numNodes == 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 38d4712de09..3910ac87530 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -115,7 +116,10 @@ public class LeafQueue extends AbstractCSQueue { // absolute capacity as a resource (based on cluster resource) private Resource absoluteCapacityResource = Resources.none(); - private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo(); + private final QueueResourceLimitsInfo queueResourceLimitsInfo = + new QueueResourceLimitsInfo(); + + private volatile ResourceLimits currentResourceLimits = null; public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -145,13 +149,14 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) this.lastClusterResource = clusterResource; updateAbsoluteCapacityResource(clusterResource); + this.currentResourceLimits = new ResourceLimits(clusterResource); + // Initialize headroom info, also used for calculating application // master resource limits. Since this happens during queue initialization // and all queues may not be realized yet, we'll use (optimistic) // absoluteMaxCapacity (it will be replaced with the more accurate // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) - updateHeadroomInfo(clusterResource, - queueCapacities.getAbsoluteMaximumCapacity()); + computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource); CapacitySchedulerConfiguration conf = csContext.getConfiguration(); userLimit = conf.getUserLimit(getQueuePath()); @@ -544,12 +549,12 @@ public synchronized Resource getAMResourceLimit() { * become busy. * */ - Resource queueMaxCap; - synchronized (queueHeadroomInfo) { - queueMaxCap = queueHeadroomInfo.getQueueMaxCap(); + Resource queueCurrentLimit; + synchronized (queueResourceLimitsInfo) { + queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); } Resource queueCap = Resources.max(resourceCalculator, lastClusterResource, - absoluteCapacityResource, queueMaxCap); + absoluteCapacityResource, queueCurrentLimit); return Resources.multiplyAndNormalizeUp( resourceCalculator, queueCap, @@ -733,8 +738,10 @@ private static Set getRequestLabelSetByExpression( @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, boolean needToUnreserve) { - + FiCaSchedulerNode node, boolean needToUnreserve, + ResourceLimits currentResourceLimits) { + this.currentResourceLimits = currentResourceLimits; + if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " #applications=" + activeApplications.size()); @@ -876,9 +883,9 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } - private synchronized CSAssignment - assignReservedContainer(FiCaSchedulerApp application, - FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) { + private synchronized CSAssignment assignReservedContainer( + FiCaSchedulerApp application, FiCaSchedulerNode node, + RMContainer rmContainer, Resource clusterResource) { // Do we still need this reservation? Priority priority = rmContainer.getReservedPriority(); if (application.getTotalRequiredResources(priority) == 0) { @@ -895,13 +902,13 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); } - protected Resource getHeadroom(User user, Resource queueMaxCap, + protected Resource getHeadroom(User user, Resource queueCurrentLimit, Resource clusterResource, FiCaSchedulerApp application, Resource required) { - return getHeadroom(user, queueMaxCap, clusterResource, + return getHeadroom(user, queueCurrentLimit, clusterResource, computeUserLimit(application, clusterResource, required, user, null)); } - private Resource getHeadroom(User user, Resource queueMaxCap, + private Resource getHeadroom(User user, Resource currentResourceLimit, Resource clusterResource, Resource userLimit) { /** * Headroom is: @@ -923,8 +930,11 @@ private Resource getHeadroom(User user, Resource queueMaxCap, Resource headroom = Resources.min(resourceCalculator, clusterResource, Resources.subtract(userLimit, user.getUsed()), - Resources.subtract(queueMaxCap, queueUsage.getUsed()) + Resources.subtract(currentResourceLimit, queueUsage.getUsed()) ); + // Normalize it before return + headroom = + Resources.roundDown(resourceCalculator, headroom, minimumAllocation); return headroom; } @@ -1012,23 +1022,17 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, return canAssign; } - private Resource updateHeadroomInfo(Resource clusterResource, - float absoluteMaxAvailCapacity) { - - Resource queueMaxCap = - Resources.multiplyAndNormalizeDown( - resourceCalculator, - clusterResource, - absoluteMaxAvailCapacity, - minimumAllocation); - - synchronized (queueHeadroomInfo) { - queueHeadroomInfo.setQueueMaxCap(queueMaxCap); - queueHeadroomInfo.setClusterResource(clusterResource); + private Resource computeQueueCurrentLimitAndSetHeadroomInfo( + Resource clusterResource) { + Resource queueCurrentResourceLimit = + getCurrentResourceLimit(clusterResource, currentResourceLimits); + + synchronized (queueResourceLimitsInfo) { + queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit); + queueResourceLimitsInfo.setClusterResource(clusterResource); } - - return queueMaxCap; - + + return queueCurrentResourceLimit; } @Lock({LeafQueue.class, FiCaSchedulerApp.class}) @@ -1043,28 +1047,22 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, computeUserLimit(application, clusterResource, required, queueUser, requestedLabels); - //Max avail capacity needs to take into account usage by ancestor-siblings - //which are greater than their base capacity, so we are interested in "max avail" - //capacity - float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, this); - - Resource queueMaxCap = - updateHeadroomInfo(clusterResource, absoluteMaxAvailCapacity); + Resource currentResourceLimit = + computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource); Resource headroom = - getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit); + getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + - " queueMaxCap=" + queueMaxCap + + " queueMaxAvailRes=" + currentResourceLimit + " consumed=" + queueUser.getUsed() + " headroom=" + headroom); } CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider( - queueUser, this, application, required, queueHeadroomInfo); + queueUser, this, application, required, queueResourceLimitsInfo); application.setHeadroomProvider(headroomProvider); @@ -1249,7 +1247,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, application.getResourceRequest(priority, node.getNodeName()); if (nodeLocalResourceRequest != null) { assigned = - assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, + assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, node, application, priority, reservedContainer, needToUnreserve); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1265,8 +1263,8 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, return SKIP_ASSIGNMENT; } - assigned = - assignRackLocalContainers(clusterResource, rackLocalResourceRequest, + assigned = + assignRackLocalContainers(clusterResource, rackLocalResourceRequest, node, application, priority, reservedContainer, needToUnreserve); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1282,10 +1280,10 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, return SKIP_ASSIGNMENT; } - return new CSAssignment( - assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, - node, application, priority, reservedContainer, needToUnreserve), - NodeType.OFF_SWITCH); + return new CSAssignment(assignOffSwitchContainers(clusterResource, + offSwitchResourceRequest, node, application, priority, + reservedContainer, needToUnreserve), + NodeType.OFF_SWITCH); } return SKIP_ASSIGNMENT; @@ -1373,7 +1371,7 @@ private Resource assignNodeLocalContainers(Resource clusterResource, ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, boolean needToUnreserve) { - if (canAssign(application, priority, node, NodeType.NODE_LOCAL, + if (canAssign(application, priority, node, NodeType.NODE_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, @@ -1383,9 +1381,9 @@ private Resource assignNodeLocalContainers(Resource clusterResource, return Resources.none(); } - private Resource assignRackLocalContainers( - Resource clusterResource, ResourceRequest rackLocalResourceRequest, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, + private Resource assignRackLocalContainers(Resource clusterResource, + ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, boolean needToUnreserve) { if (canAssign(application, priority, node, NodeType.RACK_LOCAL, reservedContainer)) { @@ -1397,9 +1395,9 @@ private Resource assignRackLocalContainers( return Resources.none(); } - private Resource assignOffSwitchContainers( - Resource clusterResource, ResourceRequest offSwitchResourceRequest, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, + private Resource assignOffSwitchContainers(Resource clusterResource, + ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, boolean needToUnreserve) { if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reservedContainer)) { @@ -1753,15 +1751,16 @@ private void updateAbsoluteCapacityResource(Resource clusterResource) { } @Override - public synchronized void updateClusterResource(Resource clusterResource) { + public synchronized void updateClusterResource(Resource clusterResource, + ResourceLimits currentResourceLimits) { + this.currentResourceLimits = currentResourceLimits; lastClusterResource = clusterResource; updateAbsoluteCapacityResource(clusterResource); // Update headroom info based on new cluster resource value // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity // during allocation - updateHeadroomInfo(clusterResource, - queueCapacities.getAbsoluteMaximumCapacity()); + computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource); // Update metrics CSQueueUtils.updateQueueStatistics( @@ -1951,16 +1950,16 @@ public void setMaxApplications(int maxApplications) { * Holds shared values used by all applications in * the queue to calculate headroom on demand */ - static class QueueHeadroomInfo { - private Resource queueMaxCap; + static class QueueResourceLimitsInfo { + private Resource queueCurrentLimit; private Resource clusterResource; - public void setQueueMaxCap(Resource queueMaxCap) { - this.queueMaxCap = queueMaxCap; + public void setQueueCurrentLimit(Resource currentLimit) { + this.queueCurrentLimit = currentLimit; } - public Resource getQueueMaxCap() { - return queueMaxCap; + public Resource getQueueCurrentLimit() { + return queueCurrentLimit; } public void setClusterResource(Resource clusterResource) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index a26b0aa7924..7feaa152fbb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -378,8 +379,9 @@ private synchronized void removeApplication(ApplicationId applicationId, } @Override - public synchronized CSAssignment assignContainers( - Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) { + public synchronized CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, boolean needToUnreserve, + ResourceLimits resourceLimits) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); Set nodeLabels = node.getLabels(); @@ -408,7 +410,8 @@ public synchronized CSAssignment assignContainers( // Schedule CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve); + assignContainersToChildQueues(clusterResource, node, + localNeedToUnreserve | needToUnreserve, resourceLimits); assignment.setType(assignedToChild.getType()); // Done if no child-queue assigned anything @@ -530,8 +533,29 @@ private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { node.getAvailableResource(), minimumAllocation); } - private synchronized CSAssignment assignContainersToChildQueues(Resource cluster, - FiCaSchedulerNode node, boolean needToUnreserve) { + private ResourceLimits getResourceLimitsOfChild(CSQueue child, + Resource clusterResource, ResourceLimits myLimits) { + /* + * Set head-room of a given child, limit = + * min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used + * + child.used. To avoid any of this queue's and its ancestors' limit + * being violated + */ + Resource myCurrentLimit = + getCurrentResourceLimit(clusterResource, myLimits); + // My available resource = my-current-limit - my-used-resource + Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit, + getUsedResources()); + // Child's limit = my-available-resource + resource-already-used-by-child + Resource childLimit = + Resources.add(myMaxAvailableResource, child.getUsedResources()); + + return new ResourceLimits(childLimit); + } + + private synchronized CSAssignment assignContainersToChildQueues( + Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve, + ResourceLimits limits) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); @@ -544,7 +568,14 @@ private synchronized CSAssignment assignContainersToChildQueues(Resource cluster LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() + " stats: " + childQueue); } - assignment = childQueue.assignContainers(cluster, node, needToUnreserve); + + // Get ResourceLimits of child queue before assign containers + ResourceLimits childLimits = + getResourceLimitsOfChild(childQueue, cluster, limits); + + assignment = + childQueue.assignContainers(cluster, node, needToUnreserve, + childLimits); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + " stats: " + childQueue + " --> " + @@ -638,10 +669,14 @@ public void completedContainer(Resource clusterResource, } @Override - public synchronized void updateClusterResource(Resource clusterResource) { + public synchronized void updateClusterResource(Resource clusterResource, + ResourceLimits resourceLimits) { // Update all children for (CSQueue childQueue : childQueues) { - childQueue.updateClusterResource(clusterResource); + // Get ResourceLimits of child queue before assign containers + ResourceLimits childLimits = + getResourceLimitsOfChild(childQueue, clusterResource, resourceLimits); + childQueue.updateClusterResource(clusterResource, childLimits); } // Update metrics @@ -728,4 +763,4 @@ public void detachContainer(Resource clusterResource, public synchronized int getNumApplications() { return numApplications; } -} +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index e1b8a3d95f6..494f5a412f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -23,14 +23,12 @@ import java.util.ArrayList; import java.util.List; -import org.junit.Assert; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -45,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; public class MockAM { @@ -53,6 +52,7 @@ public class MockAM { private RMContext context; private ApplicationMasterProtocol amRMProtocol; private UserGroupInformation ugi; + private volatile AllocateResponse lastResponse; private final List requests = new ArrayList(); private final List releases = new ArrayList(); @@ -223,7 +223,8 @@ public AllocateResponse allocate(AllocateRequest allocateRequest) context.getRMApps().get(attemptId.getApplicationId()) .getRMAppAttempt(attemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); - return doAllocateAs(ugi, allocateRequest); + lastResponse = doAllocateAs(ugi, allocateRequest); + return lastResponse; } public AllocateResponse doAllocateAs(UserGroupInformation ugi, @@ -240,6 +241,10 @@ public AllocateResponse run() throws Exception { throw (Exception) e.getCause(); } } + + public AllocateResponse doHeartbeat() throws Exception { + return allocate(null, null); + } public void unregisterAppAttempt() throws Exception { waitForState(RMAppAttemptState.RUNNING); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java index b6dfacb7028..f0bf8925049 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java @@ -38,7 +38,7 @@ public class TestResourceUsage { @Parameterized.Parameters public static Collection getParameters() { return Arrays.asList(new String[][] { { "Pending" }, { "Used" }, - { "Headroom" }, { "Reserved" }, { "AMUsed" } }); + { "Reserved" }, { "AMUsed" } }); } public TestResourceUsage(String suffix) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 81a5aad955b..8cad05725ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -21,15 +21,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import org.mockito.Matchers; -import org.mockito.Mockito; import java.io.IOException; import java.util.ArrayList; @@ -42,8 +37,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.Resource; @@ -53,9 +48,10 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -63,7 +59,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.junit.Ignore; +import org.mockito.Matchers; +import org.mockito.Mockito; public class TestApplicationLimits { @@ -171,7 +168,9 @@ public void testAMResourceLimit() throws Exception { // am limit is 4G initially (based on the queue absolute capacity) // when there is only 1 user, and drops to 2G (the userlimit) when there // is a second user - queue.updateClusterResource(Resource.newInstance(80 * GB, 40)); + Resource clusterResource = Resource.newInstance(80 * GB, 40); + queue.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class); when(queue.getActiveUsersManager()).thenReturn(activeUsersManager); @@ -289,7 +288,8 @@ public void testLimitsComputation() throws Exception { // Add some nodes to the cluster & test new limits clusterResource = Resources.createResource(120 * 16 * GB); - root.updateClusterResource(clusterResource); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); assertEquals(queue.getAMResourceLimit(), Resource.newInstance(192*GB, 1)); assertEquals(queue.getUserAMResourceLimit(), @@ -611,7 +611,8 @@ public void testHeadroom() throws Exception { app_0_0.updateResourceRequests(app_0_0_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0, false); + queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); Resource expectedHeadroom = Resources.createResource(10*16*GB, 1); assertEquals(expectedHeadroom, app_0_0.getHeadroom()); @@ -630,7 +631,8 @@ public void testHeadroom() throws Exception { app_0_1.updateResourceRequests(app_0_1_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0, false); // Schedule to compute + queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); // Schedule to compute assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change @@ -649,7 +651,8 @@ public void testHeadroom() throws Exception { app_1_0.updateResourceRequests(app_1_0_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0, false); // Schedule to compute + queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); // Schedule to compute expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_1.getHeadroom()); @@ -657,7 +660,8 @@ public void testHeadroom() throws Exception { // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); - queue.assignContainers(clusterResource, node_0, false); // Schedule to compute + queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_1.getHeadroom()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java deleted file mode 100644 index 5135ba9be9d..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java +++ /dev/null @@ -1,250 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; -import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.junit.Test; - -public class TestCSQueueUtils { - - private static final Log LOG = LogFactory.getLog(TestCSQueueUtils.class); - - final static int GB = 1024; - - @Test - public void testAbsoluteMaxAvailCapacityInvalidDivisor() throws Exception { - runInvalidDivisorTest(false); - runInvalidDivisorTest(true); - } - - public void runInvalidDivisorTest(boolean useDominant) throws Exception { - - ResourceCalculator resourceCalculator; - Resource clusterResource; - if (useDominant) { - resourceCalculator = new DominantResourceCalculator(); - clusterResource = Resources.createResource(10, 0); - } else { - resourceCalculator = new DefaultResourceCalculator(); - clusterResource = Resources.createResource(0, 99); - } - - YarnConfiguration conf = new YarnConfiguration(); - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); - - CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getClusterResource()).thenReturn(clusterResource); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getMinimumResourceCapability()). - thenReturn(Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()). - thenReturn(Resources.createResource(0, 0)); - RMContext rmContext = TestUtils.getMockRMContext(); - when(csContext.getRMContext()).thenReturn(rmContext); - - final String L1Q1 = "L1Q1"; - csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1}); - - final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1; - csConf.setCapacity(L1Q1P, 90); - csConf.setMaximumCapacity(L1Q1P, 90); - - ParentQueue root = new ParentQueue(csContext, - CapacitySchedulerConfiguration.ROOT, null, null); - LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null); - - LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, root)); - - LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l1q1)); - - assertEquals(0.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l1q1), 0.000001f); - - } - - @Test - public void testAbsoluteMaxAvailCapacityNoUse() throws Exception { - - ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); - Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32); - - YarnConfiguration conf = new YarnConfiguration(); - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); - - CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getClusterResource()).thenReturn(clusterResource); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getMinimumResourceCapability()). - thenReturn(Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()). - thenReturn(Resources.createResource(16*GB, 32)); - RMContext rmContext = TestUtils.getMockRMContext(); - when(csContext.getRMContext()).thenReturn(rmContext); - - final String L1Q1 = "L1Q1"; - csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1}); - - final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1; - csConf.setCapacity(L1Q1P, 90); - csConf.setMaximumCapacity(L1Q1P, 90); - - ParentQueue root = new ParentQueue(csContext, - CapacitySchedulerConfiguration.ROOT, null, null); - LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null); - - LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, root)); - - LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l1q1)); - - assertEquals(1.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, root), 0.000001f); - - assertEquals(0.9f, CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l1q1), 0.000001f); - - } - - @Test - public void testAbsoluteMaxAvailCapacityWithUse() throws Exception { - - ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); - Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32); - - YarnConfiguration conf = new YarnConfiguration(); - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); - - CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getClusterResource()).thenReturn(clusterResource); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getMinimumResourceCapability()). - thenReturn(Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()). - thenReturn(Resources.createResource(16*GB, 32)); - - RMContext rmContext = TestUtils.getMockRMContext(); - when(csContext.getRMContext()).thenReturn(rmContext); - - final String L1Q1 = "L1Q1"; - final String L1Q2 = "L1Q2"; - final String L2Q1 = "L2Q1"; - final String L2Q2 = "L2Q2"; - csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1, L1Q2, - L2Q1, L2Q2}); - - final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1; - csConf.setCapacity(L1Q1P, 80); - csConf.setMaximumCapacity(L1Q1P, 80); - - final String L1Q2P = CapacitySchedulerConfiguration.ROOT + "." + L1Q2; - csConf.setCapacity(L1Q2P, 20); - csConf.setMaximumCapacity(L1Q2P, 100); - - final String L2Q1P = L1Q1P + "." + L2Q1; - csConf.setCapacity(L2Q1P, 50); - csConf.setMaximumCapacity(L2Q1P, 50); - - final String L2Q2P = L1Q1P + "." + L2Q2; - csConf.setCapacity(L2Q2P, 50); - csConf.setMaximumCapacity(L2Q2P, 50); - - float result; - - ParentQueue root = new ParentQueue(csContext, - CapacitySchedulerConfiguration.ROOT, null, null); - - LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null); - LeafQueue l1q2 = new LeafQueue(csContext, L1Q2, root, null); - LeafQueue l2q2 = new LeafQueue(csContext, L2Q2, l1q1, null); - LeafQueue l2q1 = new LeafQueue(csContext, L2Q1, l1q1, null); - - //no usage, all based on maxCapacity (prior behavior) - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.4f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //some usage, but below the base capacity - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.4f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //usage gt base on parent sibling - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); - l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.3f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //same as last, but with usage also on direct parent - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.3f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //add to direct sibling, below the threshold of effect at present - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.3f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //add to direct sibling, now above the threshold of effect - //(it's cumulative with prior tests) - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.1f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index fabf47de467..83ab1046288 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -359,7 +360,8 @@ private void nodeUpdate( resourceManager.getResourceScheduler().handle(nodeUpdate); } - private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { + private CapacitySchedulerConfiguration setupQueueConfiguration( + CapacitySchedulerConfiguration conf) { // Define top-level queues conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); @@ -383,6 +385,7 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { conf.setUserLimitFactor(B3, 100.0f); LOG.info("Setup top-level queues a and b"); + return conf; } @Test @@ -2400,6 +2403,86 @@ public void testRefreshQueuesMaxAllocationCSLarger() throws Exception { assertEquals("queue B2 max vcores allocation", 12, ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores()); } + + private void waitContainerAllocated(MockAM am, int mem, int nContainer, + int startContainerId, MockRM rm, MockNM nm) throws Exception { + for (int cId = startContainerId; cId < startContainerId + nContainer; cId++) { + am.allocate("*", mem, 1, new ArrayList()); + ContainerId containerId = + ContainerId.newContainerId(am.getApplicationAttemptId(), cId); + Assert.assertTrue(rm.waitForState(nm, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + } + } + + @Test + public void testHierarchyQueuesCurrentLimits() throws Exception { + /* + * Queue tree: + * Root + * / \ + * A B + * / \ / | \ + * A1 A2 B1 B2 B3 + */ + YarnConfiguration conf = + new YarnConfiguration( + setupQueueConfiguration(new CapacitySchedulerConfiguration())); + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 100 * GB, rm1.getResourceTrackerService()); + nm1.registerNode(); + + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + waitContainerAllocated(am1, 1 * GB, 1, 2, rm1, nm1); + + // Maximum resoure of b1 is 100 * 0.895 * 0.792 = 71 GB + // 2 GBs used by am, so it's 71 - 2 = 69G. + Assert.assertEquals(69 * GB, + am1.doHeartbeat().getAvailableResources().getMemory()); + + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // Allocate 5 containers, each one is 8 GB in am2 (40 GB in total) + waitContainerAllocated(am2, 8 * GB, 5, 2, rm1, nm1); + + // Allocated one more container with 1 GB resource in b1 + waitContainerAllocated(am1, 1 * GB, 1, 3, rm1, nm1); + + // Total is 100 GB, + // B2 uses 41 GB (5 * 8GB containers and 1 AM container) + // B1 uses 3 GB (2 * 1GB containers and 1 AM container) + // Available is 100 - 41 - 3 = 56 GB + Assert.assertEquals(56 * GB, + am1.doHeartbeat().getAvailableResources().getMemory()); + + // Now we submit app3 to a1 (in higher level hierarchy), to see if headroom + // of app1 (in queue b1) updated correctly + RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1); + + // Allocate 3 containers, each one is 8 GB in am3 (24 GB in total) + waitContainerAllocated(am3, 8 * GB, 3, 2, rm1, nm1); + + // Allocated one more container with 4 GB resource in b1 + waitContainerAllocated(am1, 1 * GB, 1, 4, rm1, nm1); + + // Total is 100 GB, + // B2 uses 41 GB (5 * 8GB containers and 1 AM container) + // B1 uses 4 GB (3 * 1GB containers and 1 AM container) + // A1 uses 25 GB (3 * 8GB containers and 1 AM container) + // Available is 100 - 41 - 4 - 25 = 30 GB + Assert.assertEquals(30 * GB, + am1.doHeartbeat().getAvailableResources().getMemory()); + } private void setMaxAllocMb(Configuration conf, int maxAllocMb) { conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index af58a439436..7edb17d0e7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -143,7 +144,9 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)). - when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean()); + when(queue) + .assignContainers(eq(clusterResource), eq(node), anyBoolean(), + any(ResourceLimits.class)); // Mock the node's resource availability Resource available = node.getAvailableResource(); @@ -154,7 +157,8 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { return new CSAssignment(allocatedResource, type); } }). - when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean()); + when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(), + any(ResourceLimits.class)); doNothing().when(node).releaseContainer(any(Container.class)); } @@ -270,14 +274,16 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); for(int i=0; i < 2; i++) { stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); } for(int i=0; i < 3; i++) { @@ -285,7 +291,8 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 1*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); } for(int i=0; i < 4; i++) { @@ -293,7 +300,8 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); } verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -326,7 +334,8 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); } verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -353,7 +362,8 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 3*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -379,7 +389,8 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -393,12 +404,13 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); InOrder allocationOrder = inOrder(d,b); allocationOrder.verify(d).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index ead5719f376..a5a2e5fe0dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -294,11 +295,13 @@ public void testInitializeQueue() throws Exception { //Verify the value for getAMResourceLimit for queues with < .1 maxcap Resource clusterResource = Resource.newInstance(50 * GB, 50); - a.updateClusterResource(clusterResource); + a.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); assertEquals(Resource.newInstance(1 * GB, 1), a.getAMResourceLimit()); - b.updateClusterResource(clusterResource); + b.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); assertEquals(Resource.newInstance(5 * GB, 1), b.getAMResourceLimit()); } @@ -347,7 +350,8 @@ public void testSingleQueueOneUserMetrics() throws Exception { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals( (int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB), a.getMetrics().getAvailableMB()); @@ -482,7 +486,8 @@ public void testSingleQueueWithOneUser() throws Exception { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -492,7 +497,8 @@ public void testSingleQueueWithOneUser() throws Exception { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -500,7 +506,8 @@ public void testSingleQueueWithOneUser() throws Exception { assertEquals(2*GB, a.getMetrics().getAllocatedMB()); // Can't allocate 3rd due to user-limit - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -509,7 +516,8 @@ public void testSingleQueueWithOneUser() throws Exception { // Bump up user-limit-factor, now allocate should work a.setUserLimitFactor(10); - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -517,7 +525,8 @@ public void testSingleQueueWithOneUser() throws Exception { assertEquals(3*GB, a.getMetrics().getAllocatedMB()); // One more should work, for app_1, due to user-limit-factor - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -527,7 +536,8 @@ public void testSingleQueueWithOneUser() throws Exception { // Test max-capacity // Now - no more allocs since we are at max-cap a.setMaxCapacity(0.5f); - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -642,19 +652,22 @@ public void testUserLimits() throws Exception { // recordFactory))); // 1 container to user_0 - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Again one to user_0 since he hasn't exceeded user limit yet - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); // One more to user_0 since he is the only active user - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); @@ -705,7 +718,8 @@ public void testComputeUserLimitAndSetHeadroom(){ assertEquals("There should only be 1 active user!", 1, qb.getActiveUsersManager().getNumActiveUsers()); //get headroom - qb.assignContainers(clusterResource, node_0, false); + qb.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0 .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(), null); @@ -724,7 +738,8 @@ public void testComputeUserLimitAndSetHeadroom(){ TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, u1Priority, recordFactory))); qb.submitApplicationAttempt(app_2, user_1); - qb.assignContainers(clusterResource, node_1, false); + qb.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0 .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(), null); @@ -766,8 +781,10 @@ public void testComputeUserLimitAndSetHeadroom(){ u1Priority, recordFactory))); qb.submitApplicationAttempt(app_1, user_0); qb.submitApplicationAttempt(app_3, user_1); - qb.assignContainers(clusterResource, node_0, false); - qb.assignContainers(clusterResource, node_0, false); + qb.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); + qb.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3 .getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(), null); @@ -785,7 +802,8 @@ public void testComputeUserLimitAndSetHeadroom(){ app_4.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true, u0Priority, recordFactory))); - qb.assignContainers(clusterResource, node_1, false); + qb.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4 .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(), null); @@ -857,7 +875,8 @@ public void testUserHeadroomMultiApp() throws Exception { TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory))); - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -873,7 +892,8 @@ public void testUserHeadroomMultiApp() throws Exception { TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, priority, recordFactory))); - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -961,7 +981,8 @@ public void testHeadroomWithMaxCap() throws Exception { 1, a.getActiveUsersManager().getNumActiveUsers()); // 1 container to user_0 - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -971,7 +992,8 @@ public void testHeadroomWithMaxCap() throws Exception { // the application is not yet active // Again one to user_0 since he hasn't exceeded user limit yet - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -987,7 +1009,8 @@ public void testHeadroomWithMaxCap() throws Exception { // No more to user_0 since he is already over user-limit // and no more containers to queue since it's already at max-cap - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -1000,7 +1023,8 @@ public void testHeadroomWithMaxCap() throws Exception { TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true, priority, recordFactory))); assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap } @@ -1070,21 +1094,24 @@ public void testSingleQueueWithMultipleUsers() throws Exception { */ // Only 1 container - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Can't allocate 3rd due to user-limit a.setUserLimit(25); - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1102,7 +1129,8 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Now allocations should goto app_2 since // user_0 is at limit inspite of high user-limit-factor a.setUserLimitFactor(10); - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1111,7 +1139,8 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Now allocations should goto app_0 since // user_0 is at user-limit not above it - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1121,7 +1150,8 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Test max-capacity // Now - no more allocs since we are at max-cap a.setMaxCapacity(0.5f); - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1132,7 +1162,8 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Now, allocations should goto app_3 since it's under user-limit a.setMaxCapacity(1.0f); a.setUserLimitFactor(1); - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(7*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1140,7 +1171,8 @@ public void testSingleQueueWithMultipleUsers() throws Exception { assertEquals(1*GB, app_3.getCurrentConsumption().getMemory()); // Now we should assign to app_3 again since user_2 is under user-limit - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1239,7 +1271,8 @@ public void testReservation() throws Exception { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1249,7 +1282,8 @@ public void testReservation() throws Exception { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1257,7 +1291,8 @@ public void testReservation() throws Exception { assertEquals(2*GB, a.getMetrics().getAllocatedMB()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1273,7 +1308,8 @@ public void testReservation() throws Exception { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1289,7 +1325,8 @@ public void testReservation() throws Exception { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1356,7 +1393,8 @@ public void testStolenReservedContainer() throws Exception { // Start testing... - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1365,7 +1403,8 @@ public void testStolenReservedContainer() throws Exception { assertEquals(0*GB, a.getMetrics().getAvailableMB()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1378,7 +1417,8 @@ public void testStolenReservedContainer() throws Exception { // We do not need locality delay here doReturn(-1).when(a).getNodeLocalityDelay(); - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); assertEquals(10*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1394,7 +1434,8 @@ public void testStolenReservedContainer() throws Exception { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(8*GB, app_1.getCurrentConsumption().getMemory()); @@ -1462,20 +1503,23 @@ public void testReservationExchange() throws Exception { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1489,7 +1533,8 @@ public void testReservationExchange() throws Exception { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1498,7 +1543,8 @@ public void testReservationExchange() throws Exception { assertEquals(1, app_1.getReReservations(priority)); // Re-reserve - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1507,7 +1553,8 @@ public void testReservationExchange() throws Exception { assertEquals(2, app_1.getReReservations(priority)); // Try to schedule on node_1 now, should *move* the reservation - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); assertEquals(9*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1524,7 +1571,8 @@ public void testReservationExchange() throws Exception { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - CSAssignment assignment = a.assignContainers(clusterResource, node_0, false); + CSAssignment assignment = a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1595,7 +1643,8 @@ public void testLocalityScheduling() throws Exception { CSAssignment assignment = null; // Start with off switch, shouldn't allocate due to delay scheduling - assignment = a.assignContainers(clusterResource, node_2, false); + assignment = a.assignContainers(clusterResource, node_2, false, + new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); @@ -1603,7 +1652,8 @@ public void testLocalityScheduling() throws Exception { assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling - assignment = a.assignContainers(clusterResource, node_2, false); + assignment = a.assignContainers(clusterResource, node_2, false, + new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority)); @@ -1611,7 +1661,8 @@ public void testLocalityScheduling() throws Exception { assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling - assignment = a.assignContainers(clusterResource, node_2, false); + assignment = a.assignContainers(clusterResource, node_2, false, + new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority)); @@ -1620,7 +1671,8 @@ public void testLocalityScheduling() throws Exception { // Another off switch, now we should allocate // since missedOpportunities=3 and reqdContainers=3 - assignment = a.assignContainers(clusterResource, node_2, false); + assignment = a.assignContainers(clusterResource, node_2, false, + new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset @@ -1628,7 +1680,8 @@ public void testLocalityScheduling() throws Exception { assertEquals(NodeType.OFF_SWITCH, assignment.getType()); // NODE_LOCAL - node_0 - assignment = a.assignContainers(clusterResource, node_0, false); + assignment = a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1636,7 +1689,8 @@ public void testLocalityScheduling() throws Exception { assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // NODE_LOCAL - node_1 - assignment = a.assignContainers(clusterResource, node_1, false); + assignment = a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1664,13 +1718,15 @@ public void testLocalityScheduling() throws Exception { doReturn(1).when(a).getNodeLocalityDelay(); // Shouldn't assign RACK_LOCAL yet - assignment = a.assignContainers(clusterResource, node_3, false); + assignment = a.assignContainers(clusterResource, node_3, false, + new ResourceLimits(clusterResource)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(2, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Should assign RACK_LOCAL now - assignment = a.assignContainers(clusterResource, node_3, false); + assignment = a.assignContainers(clusterResource, node_3, false, + new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1751,7 +1807,8 @@ public void testApplicationPriorityScheduling() throws Exception { // Start with off switch, shouldn't allocate P1 due to delay scheduling // thus, no P2 either! - a.assignContainers(clusterResource, node_2, false); + a.assignContainers(clusterResource, node_2, false, + new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority_1)); @@ -1763,7 +1820,8 @@ public void testApplicationPriorityScheduling() throws Exception { // Another off-switch, shouldn't allocate P1 due to delay scheduling // thus, no P2 either! - a.assignContainers(clusterResource, node_2, false); + a.assignContainers(clusterResource, node_2, false, + new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority_1)); @@ -1774,7 +1832,8 @@ public void testApplicationPriorityScheduling() throws Exception { assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Another off-switch, shouldn't allocate OFF_SWITCH P1 - a.assignContainers(clusterResource, node_2, false); + a.assignContainers(clusterResource, node_2, false, + new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority_1)); @@ -1785,7 +1844,8 @@ public void testApplicationPriorityScheduling() throws Exception { assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, DATA_LOCAL for P1 - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); @@ -1796,7 +1856,8 @@ public void testApplicationPriorityScheduling() throws Exception { assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, OFF_SWITCH for P2 - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); @@ -1872,7 +1933,8 @@ public void testSchedulingConstraints() throws Exception { app_0.updateResourceRequests(app_0_requests_0); // NODE_LOCAL - node_0_1 - a.assignContainers(clusterResource, node_0_0, false); + a.assignContainers(clusterResource, node_0_0, false, + new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1880,7 +1942,8 @@ public void testSchedulingConstraints() throws Exception { // No allocation on node_1_0 even though it's node/rack local since // required(ANY) == 0 - a.assignContainers(clusterResource, node_1_0, false); + a.assignContainers(clusterResource, node_1_0, false, + new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero @@ -1896,14 +1959,16 @@ public void testSchedulingConstraints() throws Exception { // No allocation on node_0_1 even though it's node/rack local since // required(rack_1) == 0 - a.assignContainers(clusterResource, node_0_1, false); + a.assignContainers(clusterResource, node_0_1, false, + new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(1, app_0.getTotalRequiredResources(priority)); // NODE_LOCAL - node_1 - a.assignContainers(clusterResource, node_1_0, false); + a.assignContainers(clusterResource, node_1_0, false, + new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -2030,7 +2095,9 @@ public void testActivateApplicationByUpdatingClusterResource() assertEquals(2, e.activeApplications.size()); assertEquals(1, e.pendingApplications.size()); - e.updateClusterResource(Resources.createResource(200 * 16 * GB, 100 * 32)); + Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32); + e.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); // after updating cluster resource assertEquals(3, e.activeApplications.size()); @@ -2153,7 +2220,8 @@ public void testLocalityConstraints() throws Exception { // node_0_1 // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false - a.assignContainers(clusterResource, node_0_1, false); + a.assignContainers(clusterResource, node_0_1, false, + new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -2175,7 +2243,8 @@ public void testLocalityConstraints() throws Exception { // node_1_1 // Shouldn't allocate since RR(rack_1) = relax: false - a.assignContainers(clusterResource, node_1_1, false); + a.assignContainers(clusterResource, node_1_1, false, + new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -2205,7 +2274,8 @@ public void testLocalityConstraints() throws Exception { // node_1_1 // Shouldn't allocate since node_1_1 is blacklisted - a.assignContainers(clusterResource, node_1_1, false); + a.assignContainers(clusterResource, node_1_1, false, + new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -2233,7 +2303,8 @@ public void testLocalityConstraints() throws Exception { // node_1_1 // Shouldn't allocate since rack_1 is blacklisted - a.assignContainers(clusterResource, node_1_1, false); + a.assignContainers(clusterResource, node_1_1, false, + new ResourceLimits(clusterResource)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -2259,7 +2330,8 @@ public void testLocalityConstraints() throws Exception { // Blacklist: < host_0_0 > <---- // Now, should allocate since RR(rack_1) = relax: true - a.assignContainers(clusterResource, node_1_1, false); + a.assignContainers(clusterResource, node_1_1, false, + new ResourceLimits(clusterResource)); verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); @@ -2289,7 +2361,8 @@ public void testLocalityConstraints() throws Exception { // host_1_0: 8G // host_1_1: 7G - a.assignContainers(clusterResource, node_1_0, false); + a.assignContainers(clusterResource, node_1_0, false, + new ResourceLimits(clusterResource)); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); @@ -2323,7 +2396,8 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh() Resource newClusterResource = Resources.createResource(100 * 20 * GB, 100 * 32); - a.updateClusterResource(newClusterResource); + a.updateClusterResource(newClusterResource, + new ResourceLimits(newClusterResource)); // 100 * 20 * 0.2 = 400 assertEquals(a.getAMResourceLimit(), Resources.createResource(400 * GB, 1)); } @@ -2370,7 +2444,8 @@ public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() recordFactory))); try { - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); } catch (NullPointerException e) { Assert.fail("NPE when allocating container on node but " + "forget to set off-switch request should be handled"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 696ad7a1199..4f8938607e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -36,7 +36,6 @@ import java.util.List; import java.util.Map; -import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -47,12 +46,14 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; @@ -154,8 +155,9 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { // Next call - nothing if (allocation > 0) { - doReturn(new CSAssignment(Resources.none(), type)). - when(queue).assignContainers(eq(clusterResource), eq(node), eq(false)); + doReturn(new CSAssignment(Resources.none(), type)).when(queue) + .assignContainers(eq(clusterResource), eq(node), eq(false), + any(ResourceLimits.class)); // Mock the node's resource availability Resource available = node.getAvailableResource(); @@ -166,7 +168,8 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { return new CSAssignment(allocatedResource, type); } }). - when(queue).assignContainers(eq(clusterResource), eq(node), eq(false)); +when(queue).assignContainers(eq(clusterResource), eq(node), eq(false), + any(ResourceLimits.class)); } private float computeQueueAbsoluteUsedCapacity(CSQueue queue, @@ -229,19 +232,21 @@ public void testSingleLevelQueues() throws Exception { // Simulate B returning a container on node_0 stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource); // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G stubQueueAllocation(a, clusterResource, node_1, 2*GB); stubQueueAllocation(b, clusterResource, node_1, 1*GB); - root.assignContainers(clusterResource, node_1, false); + root.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -249,12 +254,13 @@ public void testSingleLevelQueues() throws Exception { // since A has 2/6G while B has 2/14G stubQueueAllocation(a, clusterResource, node_0, 1*GB); stubQueueAllocation(b, clusterResource, node_0, 2*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -262,12 +268,13 @@ public void testSingleLevelQueues() throws Exception { // since A has 3/6G while B has 4/14G stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 4*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); @@ -275,12 +282,13 @@ public void testSingleLevelQueues() throws Exception { // since A has 3/6G while B has 8/14G stubQueueAllocation(a, clusterResource, node_1, 1*GB); stubQueueAllocation(b, clusterResource, node_1, 1*GB); - root.assignContainers(clusterResource, node_1, false); + root.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); allocationOrder = inOrder(a, b); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); verifyQueueMetrics(a, 4*GB, clusterResource); verifyQueueMetrics(b, 9*GB, clusterResource); } @@ -441,7 +449,8 @@ public void testMultiLevelQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 1*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 0*GB, clusterResource); verifyQueueMetrics(c, 1*GB, clusterResource); @@ -453,7 +462,8 @@ public void testMultiLevelQueues() throws Exception { stubQueueAllocation(a, clusterResource, node_1, 0*GB); stubQueueAllocation(b2, clusterResource, node_1, 4*GB); stubQueueAllocation(c, clusterResource, node_1, 0*GB); - root.assignContainers(clusterResource, node_1, false); + root.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); verifyQueueMetrics(c, 1*GB, clusterResource); @@ -464,14 +474,15 @@ public void testMultiLevelQueues() throws Exception { stubQueueAllocation(a1, clusterResource, node_0, 1*GB); stubQueueAllocation(b3, clusterResource, node_0, 2*GB); stubQueueAllocation(c, clusterResource, node_0, 2*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); InOrder allocationOrder = inOrder(a, c, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 6*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -490,16 +501,17 @@ public void testMultiLevelQueues() throws Exception { stubQueueAllocation(b3, clusterResource, node_2, 1*GB); stubQueueAllocation(b1, clusterResource, node_2, 1*GB); stubQueueAllocation(c, clusterResource, node_2, 1*GB); - root.assignContainers(clusterResource, node_2, false); + root.assignContainers(clusterResource, node_2, false, + new ResourceLimits(clusterResource)); allocationOrder = inOrder(a, a2, a1, b, c); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); allocationOrder.verify(a2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); verifyQueueMetrics(c, 4*GB, clusterResource); @@ -599,7 +611,8 @@ public void testOffSwitchScheduling() throws Exception { // Simulate B returning a container on node_0 stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource); @@ -607,12 +620,13 @@ public void testOffSwitchScheduling() throws Exception { // also, B gets a scheduling opportunity since A allocates RACK_LOCAL stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL); stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_1, false); + root.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -621,12 +635,13 @@ public void testOffSwitchScheduling() throws Exception { // However, since B returns off-switch, A won't get an opportunity stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -665,7 +680,8 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { // Simulate B3 returning a container on node_0 stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); verifyQueueMetrics(b2, 0*GB, clusterResource); verifyQueueMetrics(b3, 1*GB, clusterResource); @@ -673,12 +689,13 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { // also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL); stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_1, false); + root.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); InOrder allocationOrder = inOrder(b2, b3); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 2*GB, clusterResource); @@ -687,12 +704,13 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { // However, since B3 returns off-switch, B2 won't get an opportunity stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); allocationOrder = inOrder(b3, b2); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits()); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 3*GB, clusterResource); @@ -774,4 +792,8 @@ public void testQueueAcl() throws Exception { @After public void tearDown() throws Exception { } + + private ResourceLimits anyResourceLimits() { + return any(ResourceLimits.class); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 985609e0089..4c6b25f1f1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -262,7 +263,8 @@ public void testReservation() throws Exception { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -273,7 +275,8 @@ public void testReservation() throws Exception { assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -284,7 +287,8 @@ public void testReservation() throws Exception { assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -298,7 +302,8 @@ public void testReservation() throws Exception { assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); // try to assign reducer (5G on node 0 and should reserve) - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -313,7 +318,8 @@ public void testReservation() throws Exception { assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); // assign reducer to node 2 - a.assignContainers(clusterResource, node_2, false); + a.assignContainers(clusterResource, node_2, false, + new ResourceLimits(clusterResource)); assertEquals(18 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -329,7 +335,8 @@ public void testReservation() throws Exception { // node_1 heartbeat and unreserves from node_0 in order to allocate // on node_1 - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); assertEquals(18 * GB, a.getUsedResources().getMemory()); assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -411,7 +418,8 @@ public void testReservationNoContinueLook() throws Exception { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -422,7 +430,8 @@ public void testReservationNoContinueLook() throws Exception { assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -433,7 +442,8 @@ public void testReservationNoContinueLook() throws Exception { assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -447,7 +457,8 @@ public void testReservationNoContinueLook() throws Exception { assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); // try to assign reducer (5G on node 0 and should reserve) - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -462,7 +473,8 @@ public void testReservationNoContinueLook() throws Exception { assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); // assign reducer to node 2 - a.assignContainers(clusterResource, node_2, false); + a.assignContainers(clusterResource, node_2, false, + new ResourceLimits(clusterResource)); assertEquals(18 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -478,7 +490,8 @@ public void testReservationNoContinueLook() throws Exception { // node_1 heartbeat and won't unreserve from node_0, potentially stuck // if AM doesn't handle - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); assertEquals(18 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -552,7 +565,8 @@ public void testAssignContainersNeedToUnreserve() throws Exception { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -562,7 +576,8 @@ public void testAssignContainersNeedToUnreserve() throws Exception { assertEquals(0 * GB, node_1.getUsedResource().getMemory()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -572,7 +587,8 @@ public void testAssignContainersNeedToUnreserve() throws Exception { assertEquals(0 * GB, node_1.getUsedResource().getMemory()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -585,7 +601,8 @@ public void testAssignContainersNeedToUnreserve() throws Exception { assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); // try to assign reducer (5G on node 0 and should reserve) - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -599,7 +616,8 @@ public void testAssignContainersNeedToUnreserve() throws Exception { assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); // could allocate but told need to unreserve first - a.assignContainers(clusterResource, node_1, true); + a.assignContainers(clusterResource, node_1, true, + new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -792,7 +810,8 @@ public void testAssignToQueue() throws Exception { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -802,7 +821,8 @@ public void testAssignToQueue() throws Exception { assertEquals(0 * GB, node_1.getUsedResource().getMemory()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -812,7 +832,8 @@ public void testAssignToQueue() throws Exception { assertEquals(0 * GB, node_1.getUsedResource().getMemory()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -833,7 +854,8 @@ public void testAssignToQueue() throws Exception { // now add in reservations and make sure it continues if config set // allocate to queue so that the potential new capacity is greater then // absoluteMaxCapacity - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -966,7 +988,8 @@ public void testAssignToUser() throws Exception { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -976,7 +999,8 @@ public void testAssignToUser() throws Exception { assertEquals(0 * GB, node_1.getUsedResource().getMemory()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -986,7 +1010,8 @@ public void testAssignToUser() throws Exception { assertEquals(0 * GB, node_1.getUsedResource().getMemory()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -999,7 +1024,8 @@ public void testAssignToUser() throws Exception { // now add in reservations and make sure it continues if config set // allocate to queue so that the potential new capacity is greater then // absoluteMaxCapacity - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(5 * GB, app_0.getCurrentReservation().getMemory()); @@ -1096,7 +1122,8 @@ public void testReservationsNoneAvailable() throws Exception { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(2 * GB, a.getUsedResources().getMemory()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1107,7 +1134,8 @@ public void testReservationsNoneAvailable() throws Exception { assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(5 * GB, a.getUsedResources().getMemory()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1118,7 +1146,8 @@ public void testReservationsNoneAvailable() throws Exception { assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, false); + a.assignContainers(clusterResource, node_1, false, + new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1132,7 +1161,8 @@ public void testReservationsNoneAvailable() throws Exception { // try to assign reducer (5G on node 0), but tell it // it has to unreserve. No room to allocate and shouldn't reserve // since nothing currently reserved. - a.assignContainers(clusterResource, node_0, true); + a.assignContainers(clusterResource, node_0, true, + new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1146,7 +1176,8 @@ public void testReservationsNoneAvailable() throws Exception { // try to assign reducer (5G on node 2), but tell it // it has to unreserve. Has room but shouldn't reserve // since nothing currently reserved. - a.assignContainers(clusterResource, node_2, true); + a.assignContainers(clusterResource, node_2, true, + new ResourceLimits(clusterResource)); assertEquals(8 * GB, a.getUsedResources().getMemory()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1158,7 +1189,8 @@ public void testReservationsNoneAvailable() throws Exception { assertEquals(0 * GB, node_2.getUsedResource().getMemory()); // let it assign 5G to node_2 - a.assignContainers(clusterResource, node_2, false); + a.assignContainers(clusterResource, node_2, false, + new ResourceLimits(clusterResource)); assertEquals(13 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1170,7 +1202,8 @@ public void testReservationsNoneAvailable() throws Exception { assertEquals(5 * GB, node_2.getUsedResource().getMemory()); // reserve 8G node_0 - a.assignContainers(clusterResource, node_0, false); + a.assignContainers(clusterResource, node_0, false, + new ResourceLimits(clusterResource)); assertEquals(21 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(8 * GB, a.getMetrics().getReservedMB()); @@ -1184,7 +1217,8 @@ public void testReservationsNoneAvailable() throws Exception { // try to assign (8G on node 2). No room to allocate, // continued to try due to having reservation above, // but hits queue limits so can't reserve anymore. - a.assignContainers(clusterResource, node_2, false); + a.assignContainers(clusterResource, node_2, false, + new ResourceLimits(clusterResource)); assertEquals(21 * GB, a.getUsedResources().getMemory()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); assertEquals(8 * GB, a.getMetrics().getReservedMB());