diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 925cefb0db0..f55f2f64e6a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -235,6 +235,9 @@ Release 2.6.0 - UNRELEASED YARN-668. Changed NMTokenIdentifier/AMRMTokenIdentifier/ContainerTokenIdentifier to use protobuf object as the payload. (Junping Du via jianhe) + YARN-1769. CapacityScheduler: Improve reservations (Thomas Graves via + jlowe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index b1dfb1ec5ed..0e6207be67f 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -351,4 +351,16 @@ + + + + + + + + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 04c2fd51bf8..db893dc5d40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -184,10 +184,11 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, * Assign containers to applications in the queue or it's children (if any). * @param clusterResource the resource of the cluster. * @param node node on which resources are available + * @param needToUnreserve assign container only if it can unreserve one first * @return the assignment */ public CSAssignment assignContainers( - Resource clusterResource, FiCaSchedulerNode node); + Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve); /** * A container assigned to the queue has completed. @@ -200,11 +201,13 @@ public CSAssignment assignContainers( * container * @param childQueue CSQueue to reinsert in childQueues * @param event event to be sent to the container + * @param sortQueues indicates whether it should re-sort the queues */ public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer container, ContainerStatus containerStatus, - RMContainerEventType event, CSQueue childQueue); + RMContainerEventType event, CSQueue childQueue, + boolean sortQueues); /** * Get the number of applications in the queue. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index bdfc8192469..d847579b674 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -516,13 +516,13 @@ static CSQueue parseQueue( "Queue configuration missing child queue names for " + queueName); } queue = - new LeafQueue(csContext, queueName, parent,oldQueues.get(queueName)); + new LeafQueue(csContext, queueName, parent, oldQueues.get(queueName)); // Used only for unit tests queue = hook.hook(queue); } else { ParentQueue parentQueue = - new ParentQueue(csContext, queueName, parent,oldQueues.get(queueName)); + new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); // Used only for unit tests queue = hook.hook(parentQueue); @@ -922,7 +922,8 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { node.getNodeID()); LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); - CSAssignment assignment = queue.assignContainers(clusterResource, node); + CSAssignment assignment = queue.assignContainers(clusterResource, node, + false); RMContainer excessReservation = assignment.getExcessReservation(); if (excessReservation != null) { @@ -933,7 +934,7 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { SchedulerUtils.createAbnormalContainerStatus( container.getId(), SchedulerUtils.UNRESERVED_CONTAINER), - RMContainerEventType.RELEASED, null); + RMContainerEventType.RELEASED, null, true); } } @@ -946,7 +947,7 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getAvailableResource()); } - root.assignContainers(clusterResource, node); + root.assignContainers(clusterResource, node, false); } } else { LOG.info("Skipping scheduling since node " + node.getNodeID() + @@ -1122,7 +1123,7 @@ protected synchronized void completedContainer(RMContainer rmContainer, // Inform the queue LeafQueue queue = (LeafQueue)application.getQueue(); queue.completedContainer(clusterResource, application, node, - rmContainer, containerStatus, event, null); + rmContainer, containerStatus, event, null, true); LOG.info("Application attempt " + application.getApplicationAttemptId() + " released container " + container.getId() + " on node: " + node @@ -1138,7 +1139,7 @@ public FiCaSchedulerApp getApplicationAttempt( } @Lock(Lock.NoLock.class) - FiCaSchedulerNode getNode(NodeId nodeId) { + public FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index af6bdc301ca..5542ef3e6fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -81,6 +81,13 @@ public class CapacitySchedulerConfiguration extends Configuration { @Private public static final String STATE = "state"; + + @Private + public static final String RESERVE_CONT_LOOK_ALL_NODES = PREFIX + + "reservations-continue-look-all-nodes"; + + @Private + public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true; @Private public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000; @@ -308,6 +315,17 @@ public QueueState getState(String queue) { QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING; } + /* + * Returns whether we should continue to look at all heart beating nodes even + * after the reservation limit was hit. The node heart beating in could + * satisfy the request thus could be a better pick then waiting for the + * reservation to be fullfilled. This config is refreshable. + */ + public boolean getReservationContinueLook() { + return getBoolean(RESERVE_CONT_LOOK_ALL_NODES, + DEFAULT_RESERVE_CONT_LOOK_ALL_NODES); + } + private static String getAclKey(QueueACL acl) { return "acl_" + acl.toString().toLowerCase(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index a3dbc35ad3a..03a1cb65aac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -21,10 +21,12 @@ import java.util.Comparator; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; /** @@ -55,4 +57,6 @@ public interface CapacitySchedulerContext { ResourceCalculator getResourceCalculator(); Comparator getQueueComparator(); + + FiCaSchedulerNode getNode(NodeId nodeId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5c93c5fc026..cdb6553c923 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -129,6 +130,8 @@ public class LeafQueue implements CSQueue { private final ResourceCalculator resourceCalculator; + private boolean reservationsContinueLooking; + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) { this.scheduler = cs; @@ -202,8 +205,9 @@ public LeafQueue(CapacitySchedulerContext cs, maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser, - maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs - .getConfiguration().getNodeLocalityDelay()); + maxActiveApplications, maxActiveApplicationsPerUser, state, acls, + cs.getConfiguration().getNodeLocalityDelay(), + cs.getConfiguration().getReservationContinueLook()); if(LOG.isDebugEnabled()) { LOG.debug("LeafQueue:" + " name=" + queueName @@ -225,7 +229,8 @@ private synchronized void setupQueueConfigs( int maxApplications, float maxAMResourcePerQueuePercent, int maxApplicationsPerUser, int maxActiveApplications, int maxActiveApplicationsPerUser, QueueState state, - Map acls, int nodeLocalityDelay) + Map acls, int nodeLocalityDelay, + boolean continueLooking) { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); @@ -257,6 +262,7 @@ private synchronized void setupQueueConfigs( this.queueInfo.setQueueState(this.state); this.nodeLocalityDelay = nodeLocalityDelay; + this.reservationsContinueLooking = continueLooking; StringBuilder aclsString = new StringBuilder(); for (Map.Entry e : acls.entrySet()) { @@ -321,7 +327,9 @@ resourceCalculator, this, getParent(), clusterResource, " [= configuredState ]" + "\n" + "acls = " + aclsString + " [= configuredAcls ]" + "\n" + - "nodeLocalityDelay = " + nodeLocalityDelay + "\n"); + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + + "reservationsContinueLooking = " + + reservationsContinueLooking + "\n"); } @Override @@ -555,6 +563,11 @@ public int getNodeLocalityDelay() { return nodeLocalityDelay; } + @Private + boolean getReservationContinueLooking() { + return reservationsContinueLooking; + } + public String toString() { return queueName + ": " + "capacity=" + capacity + ", " + @@ -613,7 +626,8 @@ public synchronized void reinitialize( newlyParsedLeafQueue.getMaximumActiveApplications(), newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(), newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls, - newlyParsedLeafQueue.getNodeLocalityDelay()); + newlyParsedLeafQueue.getNodeLocalityDelay(), + newlyParsedLeafQueue.reservationsContinueLooking); // queue metrics are updated, more resource may be available // activate the pending applications if possible @@ -802,8 +816,8 @@ private synchronized FiCaSchedulerApp getApplication( private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); @Override - public synchronized CSAssignment - assignContainers(Resource clusterResource, FiCaSchedulerNode node) { + public synchronized CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, boolean needToUnreserve) { if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() @@ -848,9 +862,17 @@ private synchronized FiCaSchedulerApp getApplication( Resource required = anyRequest.getCapability(); // Do we need containers at this 'priority'? - if (!needContainers(application, priority, required)) { + if (application.getTotalRequiredResources(priority) <= 0) { continue; } + if (!this.reservationsContinueLooking) { + if (!needContainers(application, priority, required)) { + if (LOG.isDebugEnabled()) { + LOG.debug("doesn't need containers based on reservation algo!"); + } + continue; + } + } // Compute user-limit & set headroom // Note: We compute both user-limit & headroom with the highest @@ -862,14 +884,14 @@ private synchronized FiCaSchedulerApp getApplication( required); // Check queue max-capacity limit - if (!assignToQueue(clusterResource, required)) { + if (!assignToQueue(clusterResource, required, application, true)) { return NULL_ASSIGNMENT; } // Check user limit - if (!assignToUser( - clusterResource, application.getUser(), userLimit)) { - break; + if (!assignToUser(clusterResource, application.getUser(), userLimit, + application, true)) { + break; } // Inform the application it is about to get a scheduling opportunity @@ -878,7 +900,7 @@ private synchronized FiCaSchedulerApp getApplication( // Try to schedule CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, - null); + null, needToUnreserve); // Did the application skip this node? if (assignment.getSkipped()) { @@ -900,6 +922,9 @@ private synchronized FiCaSchedulerApp getApplication( // otherwise the app will be delayed for each non-local assignment. // This helps apps with many off-cluster requests schedule faster. if (assignment.getType() != NodeType.OFF_SWITCH) { + if (LOG.isDebugEnabled()) { + LOG.debug("Resetting scheduling opportunities"); + } application.resetSchedulingOpportunities(priority); } @@ -935,22 +960,57 @@ private synchronized FiCaSchedulerApp getApplication( // Try to assign if we have sufficient resources assignContainersOnNode(clusterResource, node, application, priority, - rmContainer); + rmContainer, false); // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); } - private synchronized boolean assignToQueue(Resource clusterResource, - Resource required) { + + @Private + protected synchronized boolean assignToQueue(Resource clusterResource, + Resource required, FiCaSchedulerApp application, + boolean checkReservations) { + + Resource potentialTotalResource = Resources.add(usedResources, required); // Check how of the cluster's absolute capacity we are currently using... - float potentialNewCapacity = - Resources.divide( - resourceCalculator, clusterResource, - Resources.add(usedResources, required), - clusterResource); + float potentialNewCapacity = Resources.divide(resourceCalculator, + clusterResource, potentialTotalResource, clusterResource); if (potentialNewCapacity > absoluteMaxCapacity) { + // if enabled, check to see if could we potentially use this node instead + // of a reserved node if the application has reserved containers + if (this.reservationsContinueLooking && checkReservations) { + + float potentialNewWithoutReservedCapacity = Resources.divide( + resourceCalculator, + clusterResource, + Resources.subtract(potentialTotalResource, + application.getCurrentReservation()), + clusterResource); + + if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) { + if (LOG.isDebugEnabled()) { + LOG.debug("try to use reserved: " + + getQueueName() + + " usedResources: " + + usedResources + + " clusterResources: " + + clusterResource + + " reservedResources: " + + application.getCurrentReservation() + + " currentCapacity " + + Resources.divide(resourceCalculator, clusterResource, + usedResources, clusterResource) + " required " + required + + " potentialNewWithoutReservedCapacity: " + + potentialNewWithoutReservedCapacity + " ( " + " max-capacity: " + + absoluteMaxCapacity + ")"); + } + // we could potentially use this node instead of reserved node + return true; + } + + } if (LOG.isDebugEnabled()) { LOG.debug(getQueueName() + " usedResources: " + usedResources @@ -966,6 +1026,8 @@ private synchronized boolean assignToQueue(Resource clusterResource, return true; } + + @Lock({LeafQueue.class, FiCaSchedulerApp.class}) private Resource computeUserLimitAndSetHeadroom( FiCaSchedulerApp application, Resource clusterResource, Resource required) { @@ -1085,25 +1147,43 @@ private Resource computeUserLimit(FiCaSchedulerApp application, return limit; } - private synchronized boolean assignToUser(Resource clusterResource, - String userName, Resource limit) { + @Private + protected synchronized boolean assignToUser(Resource clusterResource, + String userName, Resource limit, FiCaSchedulerApp application, + boolean checkReservations) { User user = getUser(userName); - + // Note: We aren't considering the current request since there is a fixed // overhead of the AM, but it's a > check, not a >= check, so... - if (Resources.greaterThan(resourceCalculator, clusterResource, - user.getConsumedResources(), limit)) { + if (Resources.greaterThan(resourceCalculator, clusterResource, + user.getConsumedResources(), limit)) { + + // if enabled, check to see if could we potentially use this node instead + // of a reserved node if the application has reserved containers + if (this.reservationsContinueLooking && checkReservations) { + if (Resources.lessThanOrEqual( + resourceCalculator, + clusterResource, + Resources.subtract(user.getConsumedResources(), + application.getCurrentReservation()), limit)) { + + if (LOG.isDebugEnabled()) { + LOG.debug("User " + userName + " in queue " + getQueueName() + + " will exceed limit based on reservations - " + " consumed: " + + user.getConsumedResources() + " reserved: " + + application.getCurrentReservation() + " limit: " + limit); + } + return true; + } + } if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " in queue " + getQueueName() + - " will exceed limit - " + - " consumed: " + user.getConsumedResources() + - " limit: " + limit - ); + LOG.debug("User " + userName + " in queue " + getQueueName() + + " will exceed limit - " + " consumed: " + + user.getConsumedResources() + " limit: " + limit); } return false; } - return true; } @@ -1139,7 +1219,7 @@ resourceCalculator, required, getMaximumAllocation() private CSAssignment assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, - Priority priority, RMContainer reservedContainer) { + Priority priority, RMContainer reservedContainer, boolean needToUnreserve) { Resource assigned = Resources.none(); @@ -1149,7 +1229,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, if (nodeLocalResourceRequest != null) { assigned = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, - node, application, priority, reservedContainer); + node, application, priority, reservedContainer, needToUnreserve); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { return new CSAssignment(assigned, NodeType.NODE_LOCAL); @@ -1166,7 +1246,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, - node, application, priority, reservedContainer); + node, application, priority, reservedContainer, needToUnreserve); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { return new CSAssignment(assigned, NodeType.RACK_LOCAL); @@ -1183,21 +1263,99 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, return new CSAssignment( assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, - node, application, priority, reservedContainer), + node, application, priority, reservedContainer, needToUnreserve), NodeType.OFF_SWITCH); } return SKIP_ASSIGNMENT; } - private Resource assignNodeLocalContainers( - Resource clusterResource, ResourceRequest nodeLocalResourceRequest, - FiCaSchedulerNode node, FiCaSchedulerApp application, - Priority priority, RMContainer reservedContainer) { + @Private + protected boolean findNodeToUnreserve(Resource clusterResource, + FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, + Resource capability) { + // need to unreserve some other container first + NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability); + if (idToUnreserve == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("checked to see if could unreserve for app but nothing " + + "reserved that matches for this app"); + } + return false; + } + FiCaSchedulerNode nodeToUnreserve = scheduler.getNode(idToUnreserve); + if (nodeToUnreserve == null) { + LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve); + return false; + } + if (LOG.isDebugEnabled()) { + LOG.debug("unreserving for app: " + application.getApplicationId() + + " on nodeId: " + idToUnreserve + + " in order to replace reserved application and place it on node: " + + node.getNodeID() + " needing: " + capability); + } + + // headroom + Resources.addTo(application.getHeadroom(), nodeToUnreserve + .getReservedContainer().getReservedResource()); + + // Make sure to not have completedContainers sort the queues here since + // we are already inside an iterator loop for the queues and this would + // cause an concurrent modification exception. + completedContainer(clusterResource, application, nodeToUnreserve, + nodeToUnreserve.getReservedContainer(), + SchedulerUtils.createAbnormalContainerStatus(nodeToUnreserve + .getReservedContainer().getContainerId(), + SchedulerUtils.UNRESERVED_CONTAINER), + RMContainerEventType.RELEASED, null, false); + return true; + } + + @Private + protected boolean checkLimitsToReserve(Resource clusterResource, + FiCaSchedulerApp application, Resource capability, + boolean needToUnreserve) { + if (needToUnreserve) { + if (LOG.isDebugEnabled()) { + LOG.debug("we needed to unreserve to be able to allocate"); + } + return false; + } + + // we can't reserve if we got here based on the limit + // checks assuming we could unreserve!!! + Resource userLimit = computeUserLimitAndSetHeadroom(application, + clusterResource, capability); + + // Check queue max-capacity limit + if (!assignToQueue(clusterResource, capability, application, false)) { + if (LOG.isDebugEnabled()) { + LOG.debug("was going to reserve but hit queue limit"); + } + return false; + } + + // Check user limit + if (!assignToUser(clusterResource, application.getUser(), userLimit, + application, false)) { + if (LOG.isDebugEnabled()) { + LOG.debug("was going to reserve but hit user limit"); + } + return false; + } + return true; + } + + + private Resource assignNodeLocalContainers(Resource clusterResource, + ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, + RMContainer reservedContainer, boolean needToUnreserve) { if (canAssign(application, priority, node, NodeType.NODE_LOCAL, reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, - nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer); + return assignContainer(clusterResource, node, application, priority, + nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, + needToUnreserve); } return Resources.none(); @@ -1206,11 +1364,12 @@ private Resource assignNodeLocalContainers( private Resource assignRackLocalContainers( Resource clusterResource, ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer) { + RMContainer reservedContainer, boolean needToUnreserve) { if (canAssign(application, priority, node, NodeType.RACK_LOCAL, reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, - rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer); + return assignContainer(clusterResource, node, application, priority, + rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, + needToUnreserve); } return Resources.none(); @@ -1219,11 +1378,12 @@ private Resource assignRackLocalContainers( private Resource assignOffSwitchContainers( Resource clusterResource, ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer) { + RMContainer reservedContainer, boolean needToUnreserve) { if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, - offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer); + return assignContainer(clusterResource, node, application, priority, + offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, + needToUnreserve); } return Resources.none(); @@ -1303,14 +1463,17 @@ Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, return container; } + private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - ResourceRequest request, NodeType type, RMContainer rmContainer) { + ResourceRequest request, NodeType type, RMContainer rmContainer, + boolean needToUnreserve) { if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + application.getApplicationId() + " priority=" + priority.getPriority() - + " request=" + request + " type=" + type); + + " request=" + request + " type=" + type + + " needToUnreserve= " + needToUnreserve); } Resource capability = request.getCapability(); Resource available = node.getAvailableResource(); @@ -1335,6 +1498,18 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod return Resources.none(); } + // default to true since if reservation continue look feature isn't on + // needContainers is checked earlier and we wouldn't have gotten this far + boolean canAllocContainer = true; + if (this.reservationsContinueLooking) { + // based on reservations can we allocate/reserve more or do we need + // to unreserve one first + canAllocContainer = needContainers(application, priority, capability); + if (LOG.isDebugEnabled()) { + LOG.debug("can alloc container is: " + canAllocContainer); + } + } + // Can we allocate a container on this node? int availableContainers = resourceCalculator.computeAvailableContainers(available, capability); @@ -1342,8 +1517,28 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod // Allocate... // Did we previously reserve containers at this 'priority'? - if (rmContainer != null){ + if (rmContainer != null) { unreserve(application, priority, node, rmContainer); + } else if (this.reservationsContinueLooking + && (!canAllocContainer || needToUnreserve)) { + // need to unreserve some other container first + boolean res = findNodeToUnreserve(clusterResource, node, application, + priority, capability); + if (!res) { + return Resources.none(); + } + } else { + // we got here by possibly ignoring queue capacity limits. If the + // parameter needToUnreserve is true it means we ignored one of those + // limits in the chance we could unreserve. If we are here we aren't + // trying to unreserve so we can't allocate anymore due to that parent + // limit. + if (needToUnreserve) { + if (LOG.isDebugEnabled()) { + LOG.debug("we needed to unreserve to be able to allocate, skipping"); + } + return Resources.none(); + } } // Inform the application @@ -1366,17 +1561,38 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod return container.getResource(); } else { - // Reserve by 'charging' in advance... - reserve(application, priority, node, rmContainer, container); + // if we are allowed to allocate but this node doesn't have space, reserve it or + // if this was an already a reserved container, reserve it again + if ((canAllocContainer) || (rmContainer != null)) { - LOG.info("Reserved container " + - " application attempt=" + application.getApplicationAttemptId() + - " resource=" + request.getCapability() + - " queue=" + this.toString() + - " node=" + node + - " clusterResource=" + clusterResource); + if (reservationsContinueLooking) { + // we got here by possibly ignoring parent queue capacity limits. If + // the parameter needToUnreserve is true it means we ignored one of + // those limits in the chance we could unreserve. If we are here + // we aren't trying to unreserve so we can't allocate + // anymore due to that parent limit + boolean res = checkLimitsToReserve(clusterResource, application, capability, + needToUnreserve); + if (!res) { + return Resources.none(); + } + } - return request.getCapability(); + // Reserve by 'charging' in advance... + reserve(application, priority, node, rmContainer, container); + + LOG.info("Reserved container " + + " application=" + application.getApplicationId() + + " resource=" + request.getCapability() + + " queue=" + this.toString() + + " usedCapacity=" + getUsedCapacity() + + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + + " used=" + usedResources + + " cluster=" + clusterResource); + + return request.getCapability(); + } + return Resources.none(); } } @@ -1402,8 +1618,8 @@ private boolean unreserve(FiCaSchedulerApp application, Priority priority, node.unreserveResource(application); // Update reserved metrics - getMetrics().unreserveResource( - application.getUser(), rmContainer.getContainer().getResource()); + getMetrics().unreserveResource(application.getUser(), + rmContainer.getContainer().getResource()); return true; } return false; @@ -1412,7 +1628,8 @@ private boolean unreserve(FiCaSchedulerApp application, Priority priority, @Override public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue) { + ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue, + boolean sortQueues) { if (application != null) { boolean removed = false; @@ -1449,7 +1666,7 @@ public void completedContainer(Resource clusterResource, if (removed) { // Inform the parent queue _outside_ of the leaf-queue lock getParent().completedContainer(clusterResource, application, node, - rmContainer, null, event, this); + rmContainer, null, event, this, sortQueues); } } } @@ -1466,6 +1683,8 @@ synchronized void allocateResource(Resource clusterResource, String userName = application.getUser(); User user = getUser(userName); user.assignContainer(resource); + // Note this is a bit unconventional since it gets the object and modifies it here + // rather then using set routine Resources.subtractFrom(application.getHeadroom(), resource); // headroom metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); @@ -1585,7 +1804,7 @@ public synchronized void assignContainer(Resource resource) { public synchronized void releaseContainer(Resource resource) { Resources.subtractFrom(consumed, resource); - } + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 8c654b7ded1..aa74be10ac5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -100,6 +100,8 @@ public class ParentQueue implements CSQueue { RecordFactoryProvider.getRecordFactory(null); private final ResourceCalculator resourceCalculator; + + private boolean reservationsContinueLooking; public ParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) { @@ -146,7 +148,8 @@ public ParentQueue(CapacitySchedulerContext cs, setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, state, acls); + maximumCapacity, absoluteMaxCapacity, state, acls, + cs.getConfiguration().getReservationContinueLook()); this.queueComparator = cs.getQueueComparator(); this.childQueues = new TreeSet(queueComparator); @@ -160,7 +163,8 @@ private synchronized void setupQueueConfigs( Resource clusterResource, float capacity, float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity, - QueueState state, Map acls + QueueState state, Map acls, + boolean continueLooking ) { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); @@ -180,6 +184,8 @@ private synchronized void setupQueueConfigs( this.queueInfo.setMaximumCapacity(this.maximumCapacity); this.queueInfo.setQueueState(this.state); + this.reservationsContinueLooking = continueLooking; + StringBuilder aclsString = new StringBuilder(); for (Map.Entry e : acls.entrySet()) { aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); @@ -195,7 +201,8 @@ private synchronized void setupQueueConfigs( ", maxCapacity=" + maximumCapacity + ", asboluteMaxCapacity=" + absoluteMaxCapacity + ", state=" + state + - ", acls=" + aclsString); + ", acls=" + aclsString + + ", reservationsContinueLooking=" + reservationsContinueLooking); } private static float PRECISION = 0.0005f; // 0.05% precision @@ -383,7 +390,8 @@ public synchronized void reinitialize( newlyParsedParentQueue.maximumCapacity, newlyParsedParentQueue.absoluteMaxCapacity, newlyParsedParentQueue.state, - newlyParsedParentQueue.acls); + newlyParsedParentQueue.acls, + newlyParsedParentQueue.reservationsContinueLooking); // Re-configure existing child queues and add new ones // The CS has already checked to ensure all existing child queues are present! @@ -551,7 +559,7 @@ synchronized void setMaxCapacity(float maximumCapacity) { @Override public synchronized CSAssignment assignContainers( - Resource clusterResource, FiCaSchedulerNode node) { + Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); @@ -561,14 +569,19 @@ public synchronized CSAssignment assignContainers( + getQueueName()); } + boolean localNeedToUnreserve = false; // Are we over maximum-capacity for this queue? if (!assignToQueue(clusterResource)) { - break; + // check to see if we could if we unreserve first + localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource); + if (!localNeedToUnreserve) { + break; + } } // Schedule CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node); + assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve); assignment.setType(assignedToChild.getType()); // Done if no child-queue assigned anything @@ -632,6 +645,39 @@ private synchronized boolean assignToQueue(Resource clusterResource) { return true; } + + + private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) { + if (this.reservationsContinueLooking) { + // check to see if we could potentially use this node instead of a reserved + // node + + Resource reservedResources = Resources.createResource(getMetrics() + .getReservedMB(), getMetrics().getReservedVirtualCores()); + float capacityWithoutReservedCapacity = Resources.divide( + resourceCalculator, clusterResource, + Resources.subtract(usedResources, reservedResources), + clusterResource); + + if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) { + if (LOG.isDebugEnabled()) { + LOG.debug("parent: try to use reserved: " + getQueueName() + + " usedResources: " + usedResources.getMemory() + + " clusterResources: " + clusterResource.getMemory() + + " reservedResources: " + reservedResources.getMemory() + + " currentCapacity " + ((float) usedResources.getMemory()) + / clusterResource.getMemory() + + " potentialNewWithoutReservedCapacity: " + + capacityWithoutReservedCapacity + " ( " + " max-capacity: " + + absoluteMaxCapacity + ")"); + } + // we could potentially use this node instead of reserved node + return true; + } + } + return false; + } + private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { return (node.getReservedContainer() == null) && @@ -640,7 +686,7 @@ private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { } synchronized CSAssignment assignContainersToChildQueues(Resource cluster, - FiCaSchedulerNode node) { + FiCaSchedulerNode node, boolean needToUnreserve) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); @@ -653,7 +699,7 @@ synchronized CSAssignment assignContainersToChildQueues(Resource cluster, LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() + " stats: " + childQueue); } - assignment = childQueue.assignContainers(cluster, node); + assignment = childQueue.assignContainers(cluster, node, needToUnreserve); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + " stats: " + childQueue + " --> " + @@ -697,7 +743,8 @@ void printChildQueues() { public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, ContainerStatus containerStatus, - RMContainerEventType event, CSQueue completedChildQueue) { + RMContainerEventType event, CSQueue completedChildQueue, + boolean sortQueues) { if (application != null) { // Careful! Locking order is important! // Book keeping @@ -713,16 +760,21 @@ public void completedContainer(Resource clusterResource, " cluster=" + clusterResource); } - // reinsert the updated queue - for (Iterator iter=childQueues.iterator(); iter.hasNext();) { - CSQueue csqueue = iter.next(); - if(csqueue.equals(completedChildQueue)) - { - iter.remove(); - LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() + - " stats: " + csqueue); - childQueues.add(csqueue); - break; + // Note that this is using an iterator on the childQueues so this can't be + // called if already within an iterator for the childQueues. Like + // from assignContainersToChildQueues. + if (sortQueues) { + // reinsert the updated queue + for (Iterator iter=childQueues.iterator(); iter.hasNext();) { + CSQueue csqueue = iter.next(); + if(csqueue.equals(completedChildQueue)) + { + iter.remove(); + LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() + + " stats: " + csqueue); + childQueues.add(csqueue); + break; + } } } @@ -730,10 +782,15 @@ public void completedContainer(Resource clusterResource, if (parent != null) { // complete my parent parent.completedContainer(clusterResource, application, - node, rmContainer, null, event, this); + node, rmContainer, null, event, this, sortQueues); } } } + + @Private + boolean getReservationContinueLooking() { + return reservationsContinueLooking; + } synchronized void allocateResource(Resource clusterResource, Resource resource) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 167dcd80e01..dc0d0f01d71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -254,5 +254,32 @@ public synchronized Allocation getAllocation(ResourceCalculator rc, currentContPreemption, Collections.singletonList(rr), allocation.getNMTokenList()); } + + synchronized public NodeId getNodeIdToUnreserve(Priority priority, + Resource capability) { + + // first go around make this algorithm simple and just grab first + // reservation that has enough resources + Map reservedContainers = this.reservedContainers + .get(priority); + + if ((reservedContainers != null) && (!reservedContainers.isEmpty())) { + for (Map.Entry entry : reservedContainers.entrySet()) { + // make sure we unreserve one with at least the same amount of + // resources, otherwise could affect capacity limits + if (Resources.fitsIn(capability, entry.getValue().getContainer() + .getResource())) { + if (LOG.isDebugEnabled()) { + LOG.debug("unreserving node with reservation size: " + + entry.getValue().getContainer().getResource() + + " in order to allocate container with size: " + capability); + } + return entry.getKey(); + } + } + } + return null; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index a9a9975c544..ff8e873b15d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -516,7 +516,7 @@ public void testHeadroom() throws Exception { app_0_0.updateResourceRequests(app_0_0_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0); + queue.assignContainers(clusterResource, node_0, false); Resource expectedHeadroom = Resources.createResource(10*16*GB, 1); verify(app_0_0).setHeadroom(eq(expectedHeadroom)); @@ -535,7 +535,7 @@ public void testHeadroom() throws Exception { app_0_1.updateResourceRequests(app_0_1_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0); // Schedule to compute + queue.assignContainers(clusterResource, node_0, false); // Schedule to compute verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom)); verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change @@ -554,7 +554,7 @@ public void testHeadroom() throws Exception { app_1_0.updateResourceRequests(app_1_0_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0); // Schedule to compute + queue.assignContainers(clusterResource, node_0, false); // Schedule to compute expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes verify(app_0_0).setHeadroom(eq(expectedHeadroom)); verify(app_0_1).setHeadroom(eq(expectedHeadroom)); @@ -562,7 +562,7 @@ public void testHeadroom() throws Exception { // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); - queue.assignContainers(clusterResource, node_0); // Schedule to compute + queue.assignContainers(clusterResource, node_0, false); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes verify(app_0_0).setHeadroom(eq(expectedHeadroom)); verify(app_0_1).setHeadroom(eq(expectedHeadroom)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 66ec0e6ee64..fdb9028bf9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -141,7 +142,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)). - when(queue).assignContainers(eq(clusterResource), eq(node)); + when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean()); // Mock the node's resource availability Resource available = node.getAvailableResource(); @@ -152,7 +153,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { return new CSAssignment(allocatedResource, type); } }). - when(queue).assignContainers(eq(clusterResource), eq(node)); + when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean()); doNothing().when(node).releaseContainer(any(Container.class)); } @@ -244,7 +245,6 @@ public void testSortedQueues() throws Exception { doReturn(true).when(app_0).containerCompleted(any(RMContainer.class), any(ContainerStatus.class),any(RMContainerEventType.class)); - // Priority priority = TestUtils.createMockPriority(1); ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); @@ -269,14 +269,14 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); for(int i=0; i < 2; i++) { stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); } for(int i=0; i < 3; i++) { @@ -284,7 +284,7 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 1*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); } for(int i=0; i < 4; i++) { @@ -292,7 +292,7 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); } verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -305,7 +305,7 @@ public void testSortedQueues() throws Exception { for(int i=0; i < 3;i++) { d.completedContainer(clusterResource, app_0, node_0, - rmContainer, null, RMContainerEventType.KILL, null); + rmContainer, null, RMContainerEventType.KILL, null, true); } verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -325,7 +325,7 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); } verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -336,7 +336,7 @@ public void testSortedQueues() throws Exception { //Release 1GB Container from A a.completedContainer(clusterResource, app_0, node_0, - rmContainer, null, RMContainerEventType.KILL, null); + rmContainer, null, RMContainerEventType.KILL, null, true); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -352,7 +352,7 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 3*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -362,7 +362,7 @@ public void testSortedQueues() throws Exception { //Release 1GB container resources from B b.completedContainer(clusterResource, app_0, node_0, - rmContainer, null, RMContainerEventType.KILL, null); + rmContainer, null, RMContainerEventType.KILL, null, true); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -378,7 +378,7 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -392,12 +392,12 @@ public void testSortedQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); InOrder allocationOrder = inOrder(d,b); allocationOrder.verify(d).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 083cb71acb5..092ff83fdd4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -252,7 +253,7 @@ public Container answer(InvocationOnMock invocation) doNothing().when(parent).completedContainer( any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), any(RMContainer.class), any(ContainerStatus.class), - any(RMContainerEventType.class), any(CSQueue.class)); + any(RMContainerEventType.class), any(CSQueue.class), anyBoolean()); return queue; } @@ -325,7 +326,7 @@ public void testSingleQueueOneUserMetrics() throws Exception { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals( (int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB), a.getMetrics().getAvailableMB()); @@ -460,7 +461,7 @@ public void testSingleQueueWithOneUser() throws Exception { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -470,7 +471,7 @@ public void testSingleQueueWithOneUser() throws Exception { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -478,7 +479,7 @@ public void testSingleQueueWithOneUser() throws Exception { assertEquals(2*GB, a.getMetrics().getAllocatedMB()); // Can't allocate 3rd due to user-limit - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -487,7 +488,7 @@ public void testSingleQueueWithOneUser() throws Exception { // Bump up user-limit-factor, now allocate should work a.setUserLimitFactor(10); - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -495,7 +496,7 @@ public void testSingleQueueWithOneUser() throws Exception { assertEquals(3*GB, a.getMetrics().getAllocatedMB()); // One more should work, for app_1, due to user-limit-factor - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -505,7 +506,7 @@ public void testSingleQueueWithOneUser() throws Exception { // Test max-capacity // Now - no more allocs since we are at max-cap a.setMaxCapacity(0.5f); - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -518,7 +519,7 @@ public void testSingleQueueWithOneUser() throws Exception { ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); + RMContainerEventType.KILL, null, true); } assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -532,7 +533,7 @@ public void testSingleQueueWithOneUser() throws Exception { ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); + RMContainerEventType.KILL, null, true); } assertEquals(0*GB, a.getUsedResources().getMemory()); @@ -620,19 +621,19 @@ public void testUserLimits() throws Exception { // recordFactory))); // 1 container to user_0 - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Again one to user_0 since he hasn't exceeded user limit yet - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); // One more to user_0 since he is the only active user - a.assignContainers(clusterResource, node_1); + a.assignContainers(clusterResource, node_1, false); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); @@ -705,7 +706,7 @@ public void testHeadroomWithMaxCap() throws Exception { 1, a.getActiveUsersManager().getNumActiveUsers()); // 1 container to user_0 - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -713,7 +714,7 @@ public void testHeadroomWithMaxCap() throws Exception { assertEquals(0*GB, app_1.getHeadroom().getMemory()); // User limit = 2G // Again one to user_0 since he hasn't exceeded user limit yet - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -729,7 +730,7 @@ public void testHeadroomWithMaxCap() throws Exception { // No more to user_0 since he is already over user-limit // and no more containers to queue since it's already at max-cap - a.assignContainers(clusterResource, node_1); + a.assignContainers(clusterResource, node_1, false); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -743,7 +744,7 @@ public void testHeadroomWithMaxCap() throws Exception { TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true, priority, recordFactory))); assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); - a.assignContainers(clusterResource, node_1); + a.assignContainers(clusterResource, node_1, false); assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap } @@ -813,21 +814,21 @@ public void testSingleQueueWithMultipleUsers() throws Exception { */ // Only 1 container - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Can't allocate 3rd due to user-limit a.setUserLimit(25); - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -845,7 +846,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Now allocations should goto app_2 since // user_0 is at limit inspite of high user-limit-factor a.setUserLimitFactor(10); - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -854,7 +855,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Now allocations should goto app_0 since // user_0 is at user-limit not above it - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -864,7 +865,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Test max-capacity // Now - no more allocs since we are at max-cap a.setMaxCapacity(0.5f); - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -875,7 +876,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Now, allocations should goto app_3 since it's under user-limit a.setMaxCapacity(1.0f); a.setUserLimitFactor(1); - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(7*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -883,7 +884,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { assertEquals(1*GB, app_3.getCurrentConsumption().getMemory()); // Now we should assign to app_3 again since user_2 is under user-limit - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -896,7 +897,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); + RMContainerEventType.KILL, null, true); } assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -910,7 +911,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); + RMContainerEventType.KILL, null, true); } assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -924,7 +925,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); + RMContainerEventType.KILL, null, true); } assertEquals(0*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -982,7 +983,7 @@ public void testReservation() throws Exception { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -992,7 +993,7 @@ public void testReservation() throws Exception { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1000,7 +1001,7 @@ public void testReservation() throws Exception { assertEquals(2*GB, a.getMetrics().getAllocatedMB()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1015,8 +1016,8 @@ public void testReservation() throws Exception { ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); - a.assignContainers(clusterResource, node_0); + RMContainerEventType.KILL, null, true); + a.assignContainers(clusterResource, node_0, false); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1031,8 +1032,8 @@ public void testReservation() throws Exception { ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); - a.assignContainers(clusterResource, node_0); + RMContainerEventType.KILL, null, true); + a.assignContainers(clusterResource, node_0, false); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1099,7 +1100,7 @@ public void testStolenReservedContainer() throws Exception { // Start testing... - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1108,7 +1109,7 @@ public void testStolenReservedContainer() throws Exception { assertEquals(0*GB, a.getMetrics().getAvailableMB()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1121,7 +1122,7 @@ public void testStolenReservedContainer() throws Exception { // We do not need locality delay here doReturn(-1).when(a).getNodeLocalityDelay(); - a.assignContainers(clusterResource, node_1); + a.assignContainers(clusterResource, node_1, false); assertEquals(10*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1136,8 +1137,8 @@ public void testStolenReservedContainer() throws Exception { ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); - a.assignContainers(clusterResource, node_0); + RMContainerEventType.KILL, null, true); + a.assignContainers(clusterResource, node_0, false); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(8*GB, app_1.getCurrentConsumption().getMemory()); @@ -1205,20 +1206,20 @@ public void testReservationExchange() throws Exception { // Start testing... // Only 1 container - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Now, reservation should kick in for app_1 - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1231,8 +1232,8 @@ public void testReservationExchange() throws Exception { ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); - a.assignContainers(clusterResource, node_0); + RMContainerEventType.KILL, null, true); + a.assignContainers(clusterResource, node_0, false); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1241,7 +1242,7 @@ public void testReservationExchange() throws Exception { assertEquals(1, app_1.getReReservations(priority)); // Re-reserve - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1250,7 +1251,7 @@ public void testReservationExchange() throws Exception { assertEquals(2, app_1.getReReservations(priority)); // Try to schedule on node_1 now, should *move* the reservation - a.assignContainers(clusterResource, node_1); + a.assignContainers(clusterResource, node_1, false); assertEquals(9*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1266,8 +1267,8 @@ public void testReservationExchange() throws Exception { ContainerStatus.newInstance(rmContainer.getContainerId(), ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), - RMContainerEventType.KILL, null); - CSAssignment assignment = a.assignContainers(clusterResource, node_0); + RMContainerEventType.KILL, null, true); + CSAssignment assignment = a.assignContainers(clusterResource, node_0, false); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1278,6 +1279,7 @@ public void testReservationExchange() throws Exception { } + @Test public void testLocalityScheduling() throws Exception { @@ -1337,7 +1339,7 @@ public void testLocalityScheduling() throws Exception { CSAssignment assignment = null; // Start with off switch, shouldn't allocate due to delay scheduling - assignment = a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); @@ -1345,7 +1347,7 @@ public void testLocalityScheduling() throws Exception { assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling - assignment = a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority)); @@ -1353,7 +1355,7 @@ public void testLocalityScheduling() throws Exception { assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling - assignment = a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority)); @@ -1362,7 +1364,7 @@ public void testLocalityScheduling() throws Exception { // Another off switch, now we should allocate // since missedOpportunities=3 and reqdContainers=3 - assignment = a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2, false); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset @@ -1370,7 +1372,7 @@ public void testLocalityScheduling() throws Exception { assertEquals(NodeType.OFF_SWITCH, assignment.getType()); // NODE_LOCAL - node_0 - assignment = a.assignContainers(clusterResource, node_0); + assignment = a.assignContainers(clusterResource, node_0, false); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1378,7 +1380,7 @@ public void testLocalityScheduling() throws Exception { assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // NODE_LOCAL - node_1 - assignment = a.assignContainers(clusterResource, node_1); + assignment = a.assignContainers(clusterResource, node_1, false); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1406,13 +1408,13 @@ public void testLocalityScheduling() throws Exception { doReturn(1).when(a).getNodeLocalityDelay(); // Shouldn't assign RACK_LOCAL yet - assignment = a.assignContainers(clusterResource, node_3); + assignment = a.assignContainers(clusterResource, node_3, false); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(2, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Should assign RACK_LOCAL now - assignment = a.assignContainers(clusterResource, node_3); + assignment = a.assignContainers(clusterResource, node_3, false); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1493,7 +1495,7 @@ public void testApplicationPriorityScheduling() throws Exception { // Start with off switch, shouldn't allocate P1 due to delay scheduling // thus, no P2 either! - a.assignContainers(clusterResource, node_2); + a.assignContainers(clusterResource, node_2, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority_1)); @@ -1505,7 +1507,7 @@ public void testApplicationPriorityScheduling() throws Exception { // Another off-switch, shouldn't allocate P1 due to delay scheduling // thus, no P2 either! - a.assignContainers(clusterResource, node_2); + a.assignContainers(clusterResource, node_2, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority_1)); @@ -1516,7 +1518,7 @@ public void testApplicationPriorityScheduling() throws Exception { assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Another off-switch, shouldn't allocate OFF_SWITCH P1 - a.assignContainers(clusterResource, node_2); + a.assignContainers(clusterResource, node_2, false); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority_1)); @@ -1527,7 +1529,7 @@ public void testApplicationPriorityScheduling() throws Exception { assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, DATA_LOCAL for P1 - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); @@ -1538,7 +1540,7 @@ public void testApplicationPriorityScheduling() throws Exception { assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, OFF_SWITCH for P2 - a.assignContainers(clusterResource, node_1); + a.assignContainers(clusterResource, node_1, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); @@ -1614,7 +1616,7 @@ public void testSchedulingConstraints() throws Exception { app_0.updateResourceRequests(app_0_requests_0); // NODE_LOCAL - node_0_1 - a.assignContainers(clusterResource, node_0_0); + a.assignContainers(clusterResource, node_0_0, false); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1622,7 +1624,7 @@ public void testSchedulingConstraints() throws Exception { // No allocation on node_1_0 even though it's node/rack local since // required(ANY) == 0 - a.assignContainers(clusterResource, node_1_0); + a.assignContainers(clusterResource, node_1_0, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero @@ -1638,14 +1640,14 @@ public void testSchedulingConstraints() throws Exception { // No allocation on node_0_1 even though it's node/rack local since // required(rack_1) == 0 - a.assignContainers(clusterResource, node_0_1); + a.assignContainers(clusterResource, node_0_1, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(1, app_0.getTotalRequiredResources(priority)); // NODE_LOCAL - node_1 - a.assignContainers(clusterResource, node_1_0); + a.assignContainers(clusterResource, node_1_0, false); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1889,7 +1891,7 @@ public void testLocalityConstraints() throws Exception { // node_0_1 // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false - a.assignContainers(clusterResource, node_0_1); + a.assignContainers(clusterResource, node_0_1, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -1911,7 +1913,7 @@ public void testLocalityConstraints() throws Exception { // node_1_1 // Shouldn't allocate since RR(rack_1) = relax: false - a.assignContainers(clusterResource, node_1_1); + a.assignContainers(clusterResource, node_1_1, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -1941,7 +1943,7 @@ public void testLocalityConstraints() throws Exception { // node_1_1 // Shouldn't allocate since node_1_1 is blacklisted - a.assignContainers(clusterResource, node_1_1); + a.assignContainers(clusterResource, node_1_1, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -1969,7 +1971,7 @@ public void testLocalityConstraints() throws Exception { // node_1_1 // Shouldn't allocate since rack_1 is blacklisted - a.assignContainers(clusterResource, node_1_1); + a.assignContainers(clusterResource, node_1_1, false); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -1995,7 +1997,7 @@ public void testLocalityConstraints() throws Exception { // Blacklist: < host_0_0 > <---- // Now, should allocate since RR(rack_1) = relax: true - a.assignContainers(clusterResource, node_1_1); + a.assignContainers(clusterResource, node_1_1, false); verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); @@ -2025,7 +2027,7 @@ public void testLocalityConstraints() throws Exception { // host_1_0: 8G // host_1_1: 7G - a.assignContainers(clusterResource, node_1_0); + a.assignContainers(clusterResource, node_1_0, false); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); @@ -2105,7 +2107,7 @@ public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() recordFactory))); try { - a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0, false); } catch (NullPointerException e) { Assert.fail("NPE when allocating container on node but " + "forget to set off-switch request should be handled"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index fa9edb11258..8b24a7ee26f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -153,7 +154,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)). - when(queue).assignContainers(eq(clusterResource), eq(node)); + when(queue).assignContainers(eq(clusterResource), eq(node), eq(false)); // Mock the node's resource availability Resource available = node.getAvailableResource(); @@ -164,7 +165,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { return new CSAssignment(allocatedResource, type); } }). - when(queue).assignContainers(eq(clusterResource), eq(node)); + when(queue).assignContainers(eq(clusterResource), eq(node), eq(false)); } private float computeQueueAbsoluteUsedCapacity(CSQueue queue, @@ -227,19 +228,19 @@ public void testSingleLevelQueues() throws Exception { // Simulate B returning a container on node_0 stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource); // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G stubQueueAllocation(a, clusterResource, node_1, 2*GB); stubQueueAllocation(b, clusterResource, node_1, 1*GB); - root.assignContainers(clusterResource, node_1); + root.assignContainers(clusterResource, node_1, false); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -247,12 +248,12 @@ public void testSingleLevelQueues() throws Exception { // since A has 2/6G while B has 2/14G stubQueueAllocation(a, clusterResource, node_0, 1*GB); stubQueueAllocation(b, clusterResource, node_0, 2*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -260,12 +261,12 @@ public void testSingleLevelQueues() throws Exception { // since A has 3/6G while B has 4/14G stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 4*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); @@ -273,12 +274,12 @@ public void testSingleLevelQueues() throws Exception { // since A has 3/6G while B has 8/14G stubQueueAllocation(a, clusterResource, node_1, 1*GB); stubQueueAllocation(b, clusterResource, node_1, 1*GB); - root.assignContainers(clusterResource, node_1); + root.assignContainers(clusterResource, node_1, false); allocationOrder = inOrder(a, b); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 4*GB, clusterResource); verifyQueueMetrics(b, 9*GB, clusterResource); } @@ -439,7 +440,7 @@ public void testMultiLevelQueues() throws Exception { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 1*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 0*GB, clusterResource); verifyQueueMetrics(c, 1*GB, clusterResource); @@ -451,7 +452,7 @@ public void testMultiLevelQueues() throws Exception { stubQueueAllocation(a, clusterResource, node_1, 0*GB); stubQueueAllocation(b2, clusterResource, node_1, 4*GB); stubQueueAllocation(c, clusterResource, node_1, 0*GB); - root.assignContainers(clusterResource, node_1); + root.assignContainers(clusterResource, node_1, false); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); verifyQueueMetrics(c, 1*GB, clusterResource); @@ -462,14 +463,14 @@ public void testMultiLevelQueues() throws Exception { stubQueueAllocation(a1, clusterResource, node_0, 1*GB); stubQueueAllocation(b3, clusterResource, node_0, 2*GB); stubQueueAllocation(c, clusterResource, node_0, 2*GB); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); InOrder allocationOrder = inOrder(a, c, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 6*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -488,16 +489,16 @@ public void testMultiLevelQueues() throws Exception { stubQueueAllocation(b3, clusterResource, node_2, 1*GB); stubQueueAllocation(b1, clusterResource, node_2, 1*GB); stubQueueAllocation(c, clusterResource, node_2, 1*GB); - root.assignContainers(clusterResource, node_2); + root.assignContainers(clusterResource, node_2, false); allocationOrder = inOrder(a, a2, a1, b, c); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(a2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); verifyQueueMetrics(c, 4*GB, clusterResource); @@ -597,7 +598,7 @@ public void testOffSwitchScheduling() throws Exception { // Simulate B returning a container on node_0 stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource); @@ -605,12 +606,12 @@ public void testOffSwitchScheduling() throws Exception { // also, B gets a scheduling opportunity since A allocates RACK_LOCAL stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL); stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_1); + root.assignContainers(clusterResource, node_1, false); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -619,12 +620,12 @@ public void testOffSwitchScheduling() throws Exception { // However, since B returns off-switch, A won't get an opportunity stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -663,7 +664,7 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { // Simulate B3 returning a container on node_0 stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); verifyQueueMetrics(b2, 0*GB, clusterResource); verifyQueueMetrics(b3, 1*GB, clusterResource); @@ -671,12 +672,12 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { // also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL); stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_1); + root.assignContainers(clusterResource, node_1, false); InOrder allocationOrder = inOrder(b2, b3); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 2*GB, clusterResource); @@ -685,12 +686,12 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { // However, since B3 returns off-switch, B2 won't get an opportunity stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_0); + root.assignContainers(clusterResource, node_0, false); allocationOrder = inOrder(b3, b2); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class)); + any(FiCaSchedulerNode.class), anyBoolean()); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 3*GB, clusterResource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java new file mode 100644 index 00000000000..0f8290e9255 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -0,0 +1,1184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestReservations { + + private static final Log LOG = LogFactory.getLog(TestReservations.class); + + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + RMContext rmContext; + CapacityScheduler cs; + // CapacitySchedulerConfiguration csConf; + CapacitySchedulerContext csContext; + + private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); + + CSQueue root; + Map queues = new HashMap(); + Map oldQueues = new HashMap(); + + final static int GB = 1024; + final static String DEFAULT_RACK = "/default"; + + @Before + public void setUp() throws Exception { + CapacityScheduler spyCs = new CapacityScheduler(); + cs = spy(spyCs); + rmContext = TestUtils.getMockRMContext(); + + } + + private void setup(CapacitySchedulerConfiguration csConf) throws Exception { + + csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true); + final String newRoot = "root" + System.currentTimeMillis(); + // final String newRoot = "root"; + + setupQueueConfiguration(csConf, newRoot); + YarnConfiguration conf = new YarnConfiguration(); + cs.setConf(conf); + + csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getMinimumResourceCapability()).thenReturn( + Resources.createResource(GB, 1)); + when(csContext.getMaximumResourceCapability()).thenReturn( + Resources.createResource(16 * GB, 12)); + when(csContext.getClusterResource()).thenReturn( + Resources.createResource(100 * 16 * GB, 100 * 12)); + when(csContext.getApplicationComparator()).thenReturn( + CapacityScheduler.applicationComparator); + when(csContext.getQueueComparator()).thenReturn( + CapacityScheduler.queueComparator); + when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager( + conf); + containerTokenSecretManager.rollMasterKey(); + when(csContext.getContainerTokenSecretManager()).thenReturn( + containerTokenSecretManager); + + root = CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); + + cs.setRMContext(rmContext); + cs.init(csConf); + cs.start(); + } + + private static final String A = "a"; + + private void setupQueueConfiguration(CapacitySchedulerConfiguration conf, + final String newRoot) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { newRoot }); + conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100); + conf.setAcl(CapacitySchedulerConfiguration.ROOT, + QueueACL.SUBMIT_APPLICATIONS, " "); + + final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + + newRoot; + conf.setQueues(Q_newRoot, new String[] { A }); + conf.setCapacity(Q_newRoot, 100); + conf.setMaximumCapacity(Q_newRoot, 100); + conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " "); + + final String Q_A = Q_newRoot + "." + A; + conf.setCapacity(Q_A, 100f); + conf.setMaximumCapacity(Q_A, 100); + conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*"); + + } + + static LeafQueue stubLeafQueue(LeafQueue queue) { + + // Mock some methods for ease in these unit tests + + // 1. LeafQueue.createContainer to return dummy containers + doAnswer(new Answer() { + @Override + public Container answer(InvocationOnMock invocation) throws Throwable { + final FiCaSchedulerApp application = (FiCaSchedulerApp) (invocation + .getArguments()[0]); + final ContainerId containerId = TestUtils + .getMockContainerId(application); + + Container container = TestUtils.getMockContainer(containerId, + ((FiCaSchedulerNode) (invocation.getArguments()[1])).getNodeID(), + (Resource) (invocation.getArguments()[2]), + ((Priority) invocation.getArguments()[3])); + return container; + } + }).when(queue).createContainer(any(FiCaSchedulerApp.class), + any(FiCaSchedulerNode.class), any(Resource.class), any(Priority.class)); + + // 2. Stub out LeafQueue.parent.completedContainer + CSQueue parent = queue.getParent(); + doNothing().when(parent).completedContainer(any(Resource.class), + any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), + any(RMContainer.class), any(ContainerStatus.class), + any(RMContainerEventType.class), any(CSQueue.class), anyBoolean()); + + return queue; + } + + @Test + public void testReservation() throws Exception { + // Test that we now unreserve and use a node that has space + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 3; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(22 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(19 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // try to assign reducer (5G on node 0 and should reserve) + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(11 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // assign reducer to node 2 + a.assignContainers(clusterResource, node_2, false); + assertEquals(18 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, a.getMetrics().getAvailableMB()); + assertEquals(6 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); + + // node_1 heartbeat and unreserves from node_0 in order to allocate + // on node_1 + a.assignContainers(clusterResource, node_1, false); + assertEquals(18 * GB, a.getUsedResources().getMemory()); + assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(18 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, a.getMetrics().getAvailableMB()); + assertEquals(6 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(8 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + assertEquals(0, app_0.getTotalRequiredResources(priorityReduce)); + } + + @Test + public void testReservationNoContinueLook() throws Exception { + // Test that with reservations-continue-look-all-nodes feature off + // we don't unreserve and show we could get stuck + + queues = new HashMap(); + // test that the deadlock occurs when turned off + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setBoolean( + "yarn.scheduler.capacity.reservations-continue-look-all-nodes", false); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 3; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(22 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(19 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // try to assign reducer (5G on node 0 and should reserve) + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(11 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // assign reducer to node 2 + a.assignContainers(clusterResource, node_2, false); + assertEquals(18 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, a.getMetrics().getAvailableMB()); + assertEquals(6 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); + + // node_1 heartbeat and won't unreserve from node_0, potentially stuck + // if AM doesn't handle + a.assignContainers(clusterResource, node_1, false); + assertEquals(18 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, a.getMetrics().getAvailableMB()); + assertEquals(6 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); + } + + @Test + public void testAssignContainersNeedToUnreserve() throws Exception { + // Test that we now unreserve and use a node that has space + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(14 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(8 * GB, a.getMetrics().getAvailableMB()); + assertEquals(8 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // try to assign reducer (5G on node 0 and should reserve) + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // could allocate but told need to unreserve first + a.assignContainers(clusterResource, node_1, true); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(8 * GB, node_1.getUsedResource().getMemory()); + assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); + } + + @Test + public void testGetAppToUnreserve() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + final String user_0 = "user_0"; + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + + // Setup resource-requests + Priority priorityMap = TestUtils.createMockPriority(5); + Resource capability = Resources.createResource(2*GB, 0); + + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); + RMContext rmContext = mock(RMContext.class); + ContainerAllocationExpirer expirer = + mock(ContainerAllocationExpirer.class); + DrainDispatcher drainDispatcher = new DrainDispatcher(); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + app_0.getApplicationId(), 1); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); + Container container = TestUtils.getMockContainer(containerId, + node_1.getNodeID(), Resources.createResource(2*GB), priorityMap); + RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + node_1.getNodeID(), "user", rmContext); + + Container container_1 = TestUtils.getMockContainer(containerId, + node_0.getNodeID(), Resources.createResource(1*GB), priorityMap); + RMContainer rmContainer_1 = new RMContainerImpl(container_1, appAttemptId, + node_0.getNodeID(), "user", rmContext); + + // no reserved containers + NodeId unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + assertEquals(null, unreserveId); + + // no reserved containers - reserve then unreserve + app_0.reserve(node_0, priorityMap, rmContainer_1, container_1); + app_0.unreserve(node_0, priorityMap); + unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + assertEquals(null, unreserveId); + + // no container large enough is reserved + app_0.reserve(node_0, priorityMap, rmContainer_1, container_1); + unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + assertEquals(null, unreserveId); + + // reserve one that is now large enough + app_0.reserve(node_1, priorityMap, rmContainer, container); + unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + assertEquals(node_1.getNodeID(), unreserveId); + } + + @Test + public void testFindNodeToUnreserve() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + final String user_0 = "user_0"; + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + + // Setup resource-requests + Priority priorityMap = TestUtils.createMockPriority(5); + Resource capability = Resources.createResource(2 * GB, 0); + + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); + RMContext rmContext = mock(RMContext.class); + ContainerAllocationExpirer expirer = + mock(ContainerAllocationExpirer.class); + DrainDispatcher drainDispatcher = new DrainDispatcher(); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + app_0.getApplicationId(), 1); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); + Container container = TestUtils.getMockContainer(containerId, + node_1.getNodeID(), Resources.createResource(2*GB), priorityMap); + RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + node_1.getNodeID(), "user", rmContext); + + // nothing reserved + boolean res = a.findNodeToUnreserve(csContext.getClusterResource(), + node_1, app_0, priorityMap, capability); + assertFalse(res); + + // reserved but scheduler doesn't know about that node. + app_0.reserve(node_1, priorityMap, rmContainer, container); + node_1.reserveResource(app_0, priorityMap, rmContainer); + res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0, + priorityMap, capability); + assertFalse(res); + } + + @Test + public void testAssignToQueue() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(14 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(8 * GB, a.getMetrics().getAvailableMB()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + + // allocate to queue so that the potential new capacity is greater then + // absoluteMaxCapacity + Resource capability = Resources.createResource(32 * GB, 0); + boolean res = a.assignToQueue(clusterResource, capability, app_0, true); + assertFalse(res); + + // now add in reservations and make sure it continues if config set + // allocate to queue so that the potential new capacity is greater then + // absoluteMaxCapacity + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + + capability = Resources.createResource(5 * GB, 0); + res = a + .assignToQueue(clusterResource, capability, app_0, true); + assertTrue(res); + + // tell to not check reservations + res = a.assignToQueue(clusterResource, capability, app_0, false); + assertFalse(res); + + refreshQueuesTurnOffReservationsContLook(a, csConf); + + // should return false no matter what checkReservations is passed + // in since feature is off + res = a.assignToQueue(clusterResource, capability, app_0, false); + assertFalse(res); + + res = a + .assignToQueue(clusterResource, capability, app_0, true); + assertFalse(res); + } + + public void refreshQueuesTurnOffReservationsContLook(LeafQueue a, + CapacitySchedulerConfiguration csConf) throws Exception { + // before reinitialization + assertEquals(true, a.getReservationContinueLooking()); + assertEquals(true, + ((ParentQueue) a.getParent()).getReservationContinueLooking()); + + csConf.setBoolean( + CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false); + Map newQueues = new HashMap(); + CSQueue newRoot = CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, newQueues, queues, + TestUtils.spyHook); + queues = newQueues; + root.reinitialize(newRoot, cs.getClusterResource()); + + // after reinitialization + assertEquals(false, a.getReservationContinueLooking()); + assertEquals(false, + ((ParentQueue) a.getParent()).getReservationContinueLooking()); + } + + @Test + public void testContinueLookingReservationsAfterQueueRefresh() + throws Exception { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'e' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + refreshQueuesTurnOffReservationsContLook(a, csConf); + } + + @Test + public void testAssignToUser() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(14 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(8 * GB, a.getMetrics().getAvailableMB()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + + // now add in reservations and make sure it continues if config set + // allocate to queue so that the potential new capacity is greater then + // absoluteMaxCapacity + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, app_0.getCurrentReservation().getMemory()); + + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + + // set limit so subtrace reservations it can continue + Resource limit = Resources.createResource(12 * GB, 0); + boolean res = a.assignToUser(clusterResource, user_0, limit, app_0, + true); + assertTrue(res); + + // tell it not to check for reservations and should fail as already over + // limit + res = a.assignToUser(clusterResource, user_0, limit, app_0, false); + assertFalse(res); + + refreshQueuesTurnOffReservationsContLook(a, csConf); + + // should now return false since feature off + res = a.assignToUser(clusterResource, user_0, limit, app_0, true); + assertFalse(res); + } + + @Test + public void testReservationsNoneAvailable() throws Exception { + // Test that we now unreserve and use a node that has space + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 3; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + Priority priorityLast = TestUtils.createMockPriority(12); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true, + priorityLast, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(22 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(19 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // try to assign reducer (5G on node 0), but tell it + // it has to unreserve. No room to allocate and shouldn't reserve + // since nothing currently reserved. + a.assignContainers(clusterResource, node_0, true); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // try to assign reducer (5G on node 2), but tell it + // it has to unreserve. Has room but shouldn't reserve + // since nothing currently reserved. + a.assignContainers(clusterResource, node_2, true); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // let it assign 5G to node_2 + a.assignContainers(clusterResource, node_2, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(11 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + + // reserve 8G node_0 + a.assignContainers(clusterResource, node_0, false); + assertEquals(21 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(8 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + + // try to assign (8G on node 2). No room to allocate, + // continued to try due to having reservation above, + // but hits queue limits so can't reserve anymore. + a.assignContainers(clusterResource, node_2, false); + assertEquals(21 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(8 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + } +}