diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 44b87e5d283..a8307713c6c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -102,6 +102,9 @@ Release 2.8.0 - UNRELEASED YARN-3319. Implement a FairOrderingPolicy. (Craig Welch via wangda) + YARN-2498. Respect labels in preemption policy of capacity scheduler for + inter-queue preemption. (Wangda Tan via jianhe) + IMPROVEMENTS YARN-1880. Cleanup TestApplicationClientProtocolOnHA diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 2ab41979306..1f47b5f5223 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -26,11 +27,10 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.PriorityQueue; import java.util.Set; +import java.util.TreeSet; -import org.apache.commons.collections.map.HashedMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -49,7 +48,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptE import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; /** * This class implement a {@link SchedulingEditPolicy} that is designed to be @@ -130,7 +132,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; - private Map> labels; + private Map> queueToPartitions = + new HashMap<>(); + private RMNodeLabelsManager nlm; public ProportionalCapacityPreemptionPolicy() { clock = new SystemClock(); @@ -170,7 +174,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); observeOnly = config.getBoolean(OBSERVE_ONLY, false); rc = scheduler.getResourceCalculator(); - labels = null; + nlm = scheduler.getRMContext().getNodeLabelManager(); } @VisibleForTesting @@ -182,34 +186,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic public void editSchedule() { CSQueue root = scheduler.getRootQueue(); Resource clusterResources = Resources.clone(scheduler.getClusterResource()); - clusterResources = getNonLabeledResources(clusterResources); - setNodeLabels(scheduler.getRMContext().getNodeLabelManager() - .getNodeLabels()); containerBasedPreemptOrKill(root, clusterResources); } - - /** - * Setting Node Labels - * - * @param nodelabels - */ - public void setNodeLabels(Map> nodelabels) { - labels = nodelabels; - } - - /** - * This method returns all non labeled resources. - * - * @param clusterResources - * @return Resources - */ - private Resource getNonLabeledResources(Resource clusterResources) { - RMContext rmcontext = scheduler.getRMContext(); - RMNodeLabelsManager lm = rmcontext.getNodeLabelManager(); - Resource res = lm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, - clusterResources); - return res == null ? clusterResources : res; - } /** * This method selects and tracks containers to be preempted. If a container @@ -220,28 +198,46 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic */ private void containerBasedPreemptOrKill(CSQueue root, Resource clusterResources) { + // All partitions to look at + Set allPartitions = new HashSet<>(); + allPartitions.addAll(scheduler.getRMContext() + .getNodeLabelManager().getClusterNodeLabelNames()); + allPartitions.add(RMNodeLabelsManager.NO_LABEL); // extract a summary of the queues from scheduler - TempQueue tRoot; synchronized (scheduler) { - tRoot = cloneQueues(root, clusterResources); + queueToPartitions.clear(); + + for (String partitionToLookAt : allPartitions) { + cloneQueues(root, + nlm.getResourceByLabel(partitionToLookAt, clusterResources), + partitionToLookAt); + } } - // compute the ideal distribution of resources among queues - // updates cloned queues state accordingly - tRoot.idealAssigned = tRoot.guaranteed; + // compute total preemption allowed Resource totalPreemptionAllowed = Resources.multiply(clusterResources, percentageClusterPreemptionAllowed); - List queues = - recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); + + Set leafQueueNames = null; + for (String partition : allPartitions) { + TempQueuePerPartition tRoot = + getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition); + // compute the ideal distribution of resources among queues + // updates cloned queues state accordingly + tRoot.idealAssigned = tRoot.guaranteed; + + leafQueueNames = + recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); + } // based on ideal allocation select containers to be preempted from each // queue and each application Map> toPreempt = - getContainersToPreempt(queues, clusterResources); + getContainersToPreempt(leafQueueNames, clusterResources); if (LOG.isDebugEnabled()) { - logToCSV(queues); + logToCSV(new ArrayList(leafQueueNames)); } // if we are in observeOnly mode return before any action is taken @@ -252,6 +248,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // preempt (or kill) the selected containers for (Map.Entry> e : toPreempt.entrySet()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Send to scheduler: in app=" + e.getKey() + + " #containers-to-be-preempted=" + e.getValue().size()); + } for (RMContainer container : e.getValue()) { // if we tried to preempt this for more than maxWaitTime if (preempted.get(container) != null && @@ -291,23 +291,24 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * @param totalPreemptionAllowed maximum amount of preemption allowed * @return a list of leaf queues updated with preemption targets */ - private List recursivelyComputeIdealAssignment( - TempQueue root, Resource totalPreemptionAllowed) { - List leafs = new ArrayList(); + private Set recursivelyComputeIdealAssignment( + TempQueuePerPartition root, Resource totalPreemptionAllowed) { + Set leafQueueNames = new HashSet<>(); if (root.getChildren() != null && root.getChildren().size() > 0) { // compute ideal distribution at this level computeIdealResourceDistribution(rc, root.getChildren(), totalPreemptionAllowed, root.idealAssigned); // compute recursively for lower levels and build list of leafs - for(TempQueue t : root.getChildren()) { - leafs.addAll(recursivelyComputeIdealAssignment(t, totalPreemptionAllowed)); + for(TempQueuePerPartition t : root.getChildren()) { + leafQueueNames.addAll(recursivelyComputeIdealAssignment(t, + totalPreemptionAllowed)); } } else { // we are in a leaf nothing to do, just return yourself - return Collections.singletonList(root); + return ImmutableSet.of(root.queueName); } - return leafs; + return leafQueueNames; } /** @@ -324,20 +325,21 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * @param tot_guarant the amount of capacity assigned to this pool of queues */ private void computeIdealResourceDistribution(ResourceCalculator rc, - List queues, Resource totalPreemptionAllowed, Resource tot_guarant) { + List queues, Resource totalPreemptionAllowed, + Resource tot_guarant) { // qAlloc tracks currently active queues (will decrease progressively as // demand is met) - List qAlloc = new ArrayList(queues); + List qAlloc = new ArrayList(queues); // unassigned tracks how much resources are still to assign, initialized // with the total capacity for this set of queues Resource unassigned = Resources.clone(tot_guarant); // group queues based on whether they have non-zero guaranteed capacity - Set nonZeroGuarQueues = new HashSet(); - Set zeroGuarQueues = new HashSet(); + Set nonZeroGuarQueues = new HashSet(); + Set zeroGuarQueues = new HashSet(); - for (TempQueue q : qAlloc) { + for (TempQueuePerPartition q : qAlloc) { if (Resources .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) { nonZeroGuarQueues.add(q); @@ -361,7 +363,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // based on ideal assignment computed above and current assignment we derive // how much preemption is required overall Resource totPreemptionNeeded = Resource.newInstance(0, 0); - for (TempQueue t:queues) { + for (TempQueuePerPartition t:queues) { if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) { Resources.addTo(totPreemptionNeeded, Resources.subtract(t.current, t.idealAssigned)); @@ -379,12 +381,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // assign to each queue the amount of actual preemption based on local // information of ideal preemption and scaling factor - for (TempQueue t : queues) { + for (TempQueuePerPartition t : queues) { t.assignPreemption(scalingFactor, rc, tot_guarant); } if (LOG.isDebugEnabled()) { long time = clock.getTime(); - for (TempQueue t : queues) { + for (TempQueuePerPartition t : queues) { LOG.debug(time + ": " + t); } } @@ -400,8 +402,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * distributed uniformly. */ private void computeFixpointAllocation(ResourceCalculator rc, - Resource tot_guarant, Collection qAlloc, Resource unassigned, - boolean ignoreGuarantee) { + Resource tot_guarant, Collection qAlloc, + Resource unassigned, boolean ignoreGuarantee) { // Prior to assigning the unused resources, process each queue as follows: // If current > guaranteed, idealAssigned = guaranteed + untouchable extra // Else idealAssigned = current; @@ -410,10 +412,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // idealAssigned >= current + pending), remove it from consideration. // Sort queues from most under-guaranteed to most over-guaranteed. TQComparator tqComparator = new TQComparator(rc, tot_guarant); - PriorityQueue orderedByNeed = - new PriorityQueue(10,tqComparator); - for (Iterator i = qAlloc.iterator(); i.hasNext();) { - TempQueue q = i.next(); + PriorityQueue orderedByNeed = + new PriorityQueue(10, tqComparator); + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueuePerPartition q = i.next(); if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) { q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra); } else { @@ -442,10 +444,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // place it back in the ordered list of queues, recalculating its place // in the order of most under-guaranteed to most over-guaranteed. In this // way, the most underserved queue(s) are always given resources first. - Collection underserved = + Collection underserved = getMostUnderservedQueues(orderedByNeed, tqComparator); - for (Iterator i = underserved.iterator(); i.hasNext();) { - TempQueue sub = i.next(); + for (Iterator i = underserved.iterator(); i + .hasNext();) { + TempQueuePerPartition sub = i.next(); Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1)); Resource wQidle = sub.offer(wQavail, rc, tot_guarant); @@ -466,13 +469,13 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // Take the most underserved TempQueue (the one on the head). Collect and // return the list of all queues that have the same idealAssigned // percentage of guaranteed. - protected Collection getMostUnderservedQueues( - PriorityQueue orderedByNeed, TQComparator tqComparator) { - ArrayList underserved = new ArrayList(); + protected Collection getMostUnderservedQueues( + PriorityQueue orderedByNeed, TQComparator tqComparator) { + ArrayList underserved = new ArrayList(); while (!orderedByNeed.isEmpty()) { - TempQueue q1 = orderedByNeed.remove(); + TempQueuePerPartition q1 = orderedByNeed.remove(); underserved.add(q1); - TempQueue q2 = orderedByNeed.peek(); + TempQueuePerPartition q2 = orderedByNeed.peek(); // q1's pct of guaranteed won't be larger than q2's. If it's less, then // return what has already been collected. Otherwise, q1's pct of // guaranteed == that of q2, so add q2 to underserved list during the @@ -491,24 +494,90 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * @param queues the list of queues to consider */ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, - Collection queues, boolean ignoreGuar) { + Collection queues, boolean ignoreGuar) { Resource activeCap = Resource.newInstance(0, 0); if (ignoreGuar) { - for (TempQueue q : queues) { + for (TempQueuePerPartition q : queues) { q.normalizedGuarantee = (float) 1.0f / ((float) queues.size()); } } else { - for (TempQueue q : queues) { + for (TempQueuePerPartition q : queues) { Resources.addTo(activeCap, q.guaranteed); } - for (TempQueue q : queues) { + for (TempQueuePerPartition q : queues) { q.normalizedGuarantee = Resources.divide(rc, clusterResource, q.guaranteed, activeCap); } } } + private String getPartitionByNodeId(NodeId nodeId) { + return scheduler.getSchedulerNode(nodeId).getPartition(); + } + + /** + * Return should we preempt rmContainer. If we should, deduct from + * resourceToObtainByPartition + */ + private boolean tryPreemptContainerAndDeductResToObtain( + Map resourceToObtainByPartitions, + RMContainer rmContainer, Resource clusterResource, + Map> preemptMap) { + ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); + + // We will not account resource of a container twice or more + if (preemptMapContains(preemptMap, attemptId, rmContainer)) { + return false; + } + + String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode()); + Resource toObtainByPartition = + resourceToObtainByPartitions.get(nodePartition); + + if (null != toObtainByPartition + && Resources.greaterThan(rc, clusterResource, toObtainByPartition, + Resources.none())) { + Resources.subtractFrom(toObtainByPartition, + rmContainer.getAllocatedResource()); + // When we have no more resource need to obtain, remove from map. + if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition, + Resources.none())) { + resourceToObtainByPartitions.remove(nodePartition); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Marked container=" + rmContainer.getContainerId() + + " in partition=" + nodePartition + " will be preempted"); + } + // Add to preemptMap + addToPreemptMap(preemptMap, attemptId, rmContainer); + return true; + } + + return false; + } + + private boolean preemptMapContains( + Map> preemptMap, + ApplicationAttemptId attemptId, RMContainer rmContainer) { + Set rmContainers; + if (null == (rmContainers = preemptMap.get(attemptId))) { + return false; + } + return rmContainers.contains(rmContainer); + } + + private void addToPreemptMap( + Map> preemptMap, + ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { + Set set; + if (null == (set = preemptMap.get(appAttemptId))) { + set = new HashSet(); + preemptMap.put(appAttemptId, set); + } + set.add(containerToPreempt); + } + /** * Based a resource preemption target drop reservations of containers and * if necessary select containers for preemption from applications in each @@ -520,64 +589,106 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * @return a map of applciationID to set of containers to preempt */ private Map> getContainersToPreempt( - List queues, Resource clusterResource) { + Set leafQueueNames, Resource clusterResource) { - Map> preemptMap = - new HashMap>(); + Map> preemptMap = + new HashMap>(); List skippedAMContainerlist = new ArrayList(); - for (TempQueue qT : queues) { - if (qT.preemptionDisabled && qT.leafQueue != null) { + // Loop all leaf queues + for (String queueName : leafQueueNames) { + // check if preemption disabled for the queue + if (getQueueByPartition(queueName, + RMNodeLabelsManager.NO_LABEL).preemptionDisabled) { if (LOG.isDebugEnabled()) { - if (Resources.greaterThan(rc, clusterResource, - qT.toBePreempted, Resource.newInstance(0, 0))) { - LOG.debug("Tried to preempt the following " - + "resources from non-preemptable queue: " - + qT.queueName + " - Resources: " + qT.toBePreempted); - } + LOG.debug("skipping from queue=" + queueName + + " because it's a non-preemptable queue"); } continue; } - // we act only if we are violating balance by more than - // maxIgnoredOverCapacity - if (Resources.greaterThan(rc, clusterResource, qT.current, - Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { - // we introduce a dampening factor naturalTerminationFactor that - // accounts for natural termination of containers - Resource resToObtain = - Resources.multiply(qT.toBePreempted, naturalTerminationFactor); - Resource skippedAMSize = Resource.newInstance(0, 0); - // lock the leafqueue while we scan applications and unreserve - synchronized (qT.leafQueue) { - Iterator desc = - qT.leafQueue.getOrderingPolicy().getPreemptionIterator(); - qT.actuallyPreempted = Resources.clone(resToObtain); - while (desc.hasNext()) { - FiCaSchedulerApp fc = desc.next(); - if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, - Resources.none())) { - break; + // compute resToObtainByPartition considered inter-queue preemption + LeafQueue leafQueue = null; + + Map resToObtainByPartition = + new HashMap(); + for (TempQueuePerPartition qT : getQueuePartitions(queueName)) { + leafQueue = qT.leafQueue; + // we act only if we are violating balance by more than + // maxIgnoredOverCapacity + if (Resources.greaterThan(rc, clusterResource, qT.current, + Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { + // we introduce a dampening factor naturalTerminationFactor that + // accounts for natural termination of containers + Resource resToObtain = + Resources.multiply(qT.toBePreempted, naturalTerminationFactor); + // Only add resToObtain when it >= 0 + if (Resources.greaterThan(rc, clusterResource, resToObtain, + Resources.none())) { + resToObtainByPartition.put(qT.partition, resToObtain); + if (LOG.isDebugEnabled()) { + LOG.debug("Queue=" + queueName + " partition=" + qT.partition + + " resource-to-obtain=" + resToObtain); } - preemptMap.put( - fc.getApplicationAttemptId(), - preemptFrom(fc, clusterResource, resToObtain, - skippedAMContainerlist, skippedAMSize)); } - Resource maxAMCapacityForThisQueue = Resources.multiply( - Resources.multiply(clusterResource, - qT.leafQueue.getAbsoluteCapacity()), - qT.leafQueue.getMaxAMResourcePerQueuePercent()); - - // Can try preempting AMContainers (still saving atmost - // maxAMCapacityForThisQueue AMResource's) if more resources are - // required to be preempted from this Queue. - preemptAMContainers(clusterResource, preemptMap, - skippedAMContainerlist, resToObtain, skippedAMSize, - maxAMCapacityForThisQueue); + qT.actuallyPreempted = Resources.clone(resToObtain); + } else { + qT.actuallyPreempted = Resources.none(); } } + + synchronized (leafQueue) { + // go through all ignore-partition-exclusivity containers first to make + // sure such containers will be preempted first + Map> ignorePartitionExclusivityContainers = + leafQueue.getIgnoreExclusivityRMContainers(); + for (String partition : resToObtainByPartition.keySet()) { + if (ignorePartitionExclusivityContainers.containsKey(partition)) { + TreeSet rmContainers = + ignorePartitionExclusivityContainers.get(partition); + // We will check container from reverse order, so latter submitted + // application's containers will be preempted first. + for (RMContainer c : rmContainers.descendingSet()) { + boolean preempted = + tryPreemptContainerAndDeductResToObtain( + resToObtainByPartition, c, clusterResource, preemptMap); + if (!preempted) { + break; + } + } + } + } + + // preempt other containers + Resource skippedAMSize = Resource.newInstance(0, 0); + Iterator desc = + leafQueue.getOrderingPolicy().getPreemptionIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp fc = desc.next(); + // When we complete preempt from one partition, we will remove from + // resToObtainByPartition, so when it becomes empty, we can get no + // more preemption is needed + if (resToObtainByPartition.isEmpty()) { + break; + } + + preemptFrom(fc, clusterResource, resToObtainByPartition, + skippedAMContainerlist, skippedAMSize, preemptMap); + } + + // Can try preempting AMContainers (still saving atmost + // maxAMCapacityForThisQueue AMResource's) if more resources are + // required to be preempted from this Queue. + Resource maxAMCapacityForThisQueue = Resources.multiply( + Resources.multiply(clusterResource, + leafQueue.getAbsoluteCapacity()), + leafQueue.getMaxAMResourcePerQueuePercent()); + + preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist, + resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue); + } } + return preemptMap; } @@ -595,31 +706,27 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic */ private void preemptAMContainers(Resource clusterResource, Map> preemptMap, - List skippedAMContainerlist, Resource resToObtain, - Resource skippedAMSize, Resource maxAMCapacityForThisQueue) { + List skippedAMContainerlist, + Map resToObtainByPartition, Resource skippedAMSize, + Resource maxAMCapacityForThisQueue) { for (RMContainer c : skippedAMContainerlist) { // Got required amount of resources for preemption, can stop now - if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, - Resources.none())) { + if (resToObtainByPartition.isEmpty()) { break; } // Once skippedAMSize reaches down to maxAMCapacityForThisQueue, - // container selection iteration for preemption will be stopped. + // container selection iteration for preemption will be stopped. if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize, maxAMCapacityForThisQueue)) { break; } - Set contToPrempt = preemptMap.get(c - .getApplicationAttemptId()); - if (null == contToPrempt) { - contToPrempt = new HashSet(); - preemptMap.put(c.getApplicationAttemptId(), contToPrempt); + + boolean preempted = + tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, + clusterResource, preemptMap); + if (preempted) { + Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); } - contToPrempt.add(c); - - Resources.subtractFrom(resToObtain, c.getContainer().getResource()); - Resources.subtractFrom(skippedAMSize, c.getContainer() - .getResource()); } skippedAMContainerlist.clear(); } @@ -627,71 +734,59 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic /** * Given a target preemption for a specific application, select containers * to preempt (after unreserving all reservation for that app). - * - * @param app - * @param clusterResource - * @param rsrcPreempt - * @return Set Set of RMContainers */ - private Set preemptFrom(FiCaSchedulerApp app, - Resource clusterResource, Resource rsrcPreempt, - List skippedAMContainerlist, Resource skippedAMSize) { - Set ret = new HashSet(); + private void preemptFrom(FiCaSchedulerApp app, + Resource clusterResource, Map resToObtainByPartition, + List skippedAMContainerlist, Resource skippedAMSize, + Map> preemptMap) { ApplicationAttemptId appId = app.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Looking at application=" + app.getApplicationAttemptId() + + " resourceToObtain=" + resToObtainByPartition); + } // first drop reserved containers towards rsrcPreempt - List reservations = + List reservedContainers = new ArrayList(app.getReservedContainers()); - for (RMContainer c : reservations) { - if (Resources.lessThanOrEqual(rc, clusterResource, - rsrcPreempt, Resources.none())) { - return ret; + for (RMContainer c : reservedContainers) { + if (resToObtainByPartition.isEmpty()) { + return; } + + // Try to preempt this container + tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, + clusterResource, preemptMap); + if (!observeOnly) { dispatcher.handle(new ContainerPreemptEvent(appId, c, ContainerPreemptEventType.DROP_RESERVATION)); } - Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); } // if more resources are to be freed go through all live containers in // reverse priority and reverse allocation order and mark them for // preemption - List containers = + List liveContainers = new ArrayList(app.getLiveContainers()); - sortContainers(containers); + sortContainers(liveContainers); - for (RMContainer c : containers) { - if (Resources.lessThanOrEqual(rc, clusterResource, - rsrcPreempt, Resources.none())) { - return ret; + for (RMContainer c : liveContainers) { + if (resToObtainByPartition.isEmpty()) { + return; } + // Skip AM Container from preemption for now. if (c.isAMContainer()) { skippedAMContainerlist.add(c); - Resources.addTo(skippedAMSize, c.getContainer().getResource()); + Resources.addTo(skippedAMSize, c.getAllocatedResource()); continue; } - // skip Labeled resource - if(isLabeledContainer(c)){ - continue; - } - ret.add(c); - Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); - } - return ret; - } - - /** - * Checking if given container is a labeled container - * - * @param c - * @return true/false - */ - private boolean isLabeledContainer(RMContainer c) { - return labels.containsKey(c.getAllocatedNode()); + // Try to preempt this container + tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, + clusterResource, preemptMap); + } } /** @@ -733,32 +828,48 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * the leaves. Finally it aggregates pending resources in each queue and rolls * it up to higher levels. * - * @param root the root of the CapacityScheduler queue hierarchy - * @param clusterResources the total amount of resources in the cluster + * @param curQueue current queue which I'm looking at now + * @param partitionResource the total amount of resources in the cluster * @return the root of the cloned queue hierarchy */ - private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { - TempQueue ret; - synchronized (root) { - String queueName = root.getQueueName(); - float absUsed = root.getAbsoluteUsedCapacity(); - float absCap = root.getAbsoluteCapacity(); - float absMaxCap = root.getAbsoluteMaximumCapacity(); - boolean preemptionDisabled = root.getPreemptionDisabled(); + private TempQueuePerPartition cloneQueues(CSQueue curQueue, + Resource partitionResource, String partitionToLookAt) { + TempQueuePerPartition ret; + synchronized (curQueue) { + String queueName = curQueue.getQueueName(); + QueueCapacities qc = curQueue.getQueueCapacities(); + float absUsed = qc.getAbsoluteUsedCapacity(partitionToLookAt); + float absCap = qc.getAbsoluteCapacity(partitionToLookAt); + float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt); + boolean preemptionDisabled = curQueue.getPreemptionDisabled(); - Resource current = Resources.multiply(clusterResources, absUsed); - Resource guaranteed = Resources.multiply(clusterResources, absCap); - Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); + Resource current = Resources.multiply(partitionResource, absUsed); + Resource guaranteed = Resources.multiply(partitionResource, absCap); + Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap); + + // when partition is a non-exclusive partition, the actual maxCapacity + // could more than specified maxCapacity + try { + if (!scheduler.getRMContext().getNodeLabelManager() + .isExclusiveNodeLabel(partitionToLookAt)) { + maxCapacity = + Resources.max(rc, partitionResource, maxCapacity, current); + } + } catch (IOException e) { + // This may cause by partition removed when running capacity monitor, + // just ignore the error, this will be corrected when doing next check. + } Resource extra = Resource.newInstance(0, 0); - if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) { + if (Resources.greaterThan(rc, partitionResource, current, guaranteed)) { extra = Resources.subtract(current, guaranteed); } - if (root instanceof LeafQueue) { - LeafQueue l = (LeafQueue) root; - Resource pending = l.getTotalResourcePending(); - ret = new TempQueue(queueName, current, pending, guaranteed, - maxCapacity, preemptionDisabled); + if (curQueue instanceof LeafQueue) { + LeafQueue l = (LeafQueue) curQueue; + Resource pending = + l.getQueueResourceUsage().getPending(partitionToLookAt); + ret = new TempQueuePerPartition(queueName, current, pending, guaranteed, + maxCapacity, preemptionDisabled, partitionToLookAt); if (preemptionDisabled) { ret.untouchableExtra = extra; } else { @@ -767,17 +878,19 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic ret.setLeafQueue(l); } else { Resource pending = Resource.newInstance(0, 0); - ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, - maxCapacity, false); + ret = + new TempQueuePerPartition(curQueue.getQueueName(), current, pending, + guaranteed, maxCapacity, false, partitionToLookAt); Resource childrensPreemptable = Resource.newInstance(0, 0); - for (CSQueue c : root.getChildQueues()) { - TempQueue subq = cloneQueues(c, clusterResources); + for (CSQueue c : curQueue.getChildQueues()) { + TempQueuePerPartition subq = + cloneQueues(c, partitionResource, partitionToLookAt); Resources.addTo(childrensPreemptable, subq.preemptableExtra); ret.addChild(subq); } // untouchableExtra = max(extra - childrenPreemptable, 0) if (Resources.greaterThanOrEqual( - rc, clusterResources, childrensPreemptable, extra)) { + rc, partitionResource, childrensPreemptable, extra)) { ret.untouchableExtra = Resource.newInstance(0, 0); } else { ret.untouchableExtra = @@ -785,52 +898,87 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } } } + addTempQueuePartition(ret); return ret; } // simple printout function that reports internal queue state (useful for // plotting) - private void logToCSV(List unorderedqueues){ - List queues = new ArrayList(unorderedqueues); - Collections.sort(queues, new Comparator(){ - @Override - public int compare(TempQueue o1, TempQueue o2) { - return o1.queueName.compareTo(o2.queueName); - }}); + private void logToCSV(List leafQueueNames){ + Collections.sort(leafQueueNames); String queueState = " QUEUESTATE: " + clock.getTime(); StringBuilder sb = new StringBuilder(); sb.append(queueState); - for (TempQueue tq : queues) { + + for (String queueName : leafQueueNames) { + TempQueuePerPartition tq = + getQueueByPartition(queueName, RMNodeLabelsManager.NO_LABEL); sb.append(", "); tq.appendLogString(sb); } LOG.debug(sb.toString()); } + private void addTempQueuePartition(TempQueuePerPartition queuePartition) { + String queueName = queuePartition.queueName; + + Map queuePartitions; + if (null == (queuePartitions = queueToPartitions.get(queueName))) { + queuePartitions = new HashMap(); + queueToPartitions.put(queueName, queuePartitions); + } + queuePartitions.put(queuePartition.partition, queuePartition); + } + + /** + * Get queue partition by given queueName and partitionName + */ + private TempQueuePerPartition getQueueByPartition(String queueName, + String partition) { + Map partitionToQueues = null; + if (null == (partitionToQueues = queueToPartitions.get(queueName))) { + return null; + } + return partitionToQueues.get(partition); + } + + /** + * Get all queue partitions by given queueName + */ + private Collection getQueuePartitions(String queueName) { + if (!queueToPartitions.containsKey(queueName)) { + return null; + } + return queueToPartitions.get(queueName).values(); + } + /** * Temporary data-structure tracking resource availability, pending resource - * need, current utilization. Used to clone {@link CSQueue}. + * need, current utilization. This is per-queue-per-partition data structure */ - static class TempQueue { + static class TempQueuePerPartition { final String queueName; final Resource current; final Resource pending; final Resource guaranteed; final Resource maxCapacity; + final String partition; Resource idealAssigned; Resource toBePreempted; + // For logging purpose Resource actuallyPreempted; Resource untouchableExtra; Resource preemptableExtra; double normalizedGuarantee; - final ArrayList children; + final ArrayList children; LeafQueue leafQueue; boolean preemptionDisabled; - TempQueue(String queueName, Resource current, Resource pending, - Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled) { + TempQueuePerPartition(String queueName, Resource current, Resource pending, + Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled, + String partition) { this.queueName = queueName; this.current = current; this.pending = pending; @@ -840,10 +988,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic this.actuallyPreempted = Resource.newInstance(0, 0); this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; - this.children = new ArrayList(); + this.children = new ArrayList(); this.untouchableExtra = Resource.newInstance(0, 0); this.preemptableExtra = Resource.newInstance(0, 0); this.preemptionDisabled = preemptionDisabled; + this.partition = partition; } public void setLeafQueue(LeafQueue l){ @@ -855,19 +1004,19 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * When adding a child we also aggregate its pending resource needs. * @param q the child queue to add to this queue */ - public void addChild(TempQueue q) { + public void addChild(TempQueuePerPartition q) { assert leafQueue == null; children.add(q); Resources.addTo(pending, q.pending); } - public void addChildren(ArrayList queues) { + public void addChildren(ArrayList queues) { assert leafQueue == null; children.addAll(queues); } - public ArrayList getChildren(){ + public ArrayList getChildren(){ return children; } @@ -909,7 +1058,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic public void printAll() { LOG.info(this.toString()); - for (TempQueue sub : this.getChildren()) { + for (TempQueuePerPartition sub : this.getChildren()) { sub.printAll(); } } @@ -942,7 +1091,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } - static class TQComparator implements Comparator { + static class TQComparator implements Comparator { private ResourceCalculator rc; private Resource clusterRes; @@ -952,7 +1101,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } @Override - public int compare(TempQueue tq1, TempQueue tq2) { + public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { return -1; } @@ -965,7 +1114,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // Calculates idealAssigned / guaranteed // TempQueues with 0 guarantees are always considered the most over // capacity and therefore considered last for resources. - private double getIdealPctOfGuaranteed(TempQueue q) { + private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { double pctOver = Integer.MAX_VALUE; if (q != null && Resources.greaterThan( rc, clusterRes, q.guaranteed, Resources.none())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 2750d4e4af7..316a450575f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @SuppressWarnings({"unchecked", "rawtypes"}) -public class RMContainerImpl implements RMContainer { +public class RMContainerImpl implements RMContainer, Comparable { private static final Log LOG = LogFactory.getLog(RMContainerImpl.class); @@ -615,4 +615,30 @@ public class RMContainerImpl implements RMContainer { } return nodeLabelExpression; } + + @Override + public boolean equals(Object obj) { + if (obj instanceof RMContainer) { + if (null != getContainerId()) { + return getContainerId().equals(((RMContainer) obj).getContainerId()); + } + } + return false; + } + + @Override + public int hashCode() { + if (null != getContainerId()) { + return getContainerId().hashCode(); + } + return super.hashCode(); + } + + @Override + public int compareTo(RMContainer o) { + if (containerId != null && o.getContainerId() != null) { + return containerId.compareTo(o.getContainerId()); + } + return -1; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1e1623d7393..48c7f2fd4a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -153,7 +153,7 @@ public class CapacityScheduler extends static final PartitionedQueueComparator partitionedQueueComparator = new PartitionedQueueComparator(); - static final Comparator applicationComparator = + public static final Comparator applicationComparator = new Comparator() { @Override public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 22aafaaaaa1..56ade8488b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -68,9 +68,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; @@ -118,11 +119,16 @@ public class LeafQueue extends AbstractCSQueue { private final QueueResourceLimitsInfo queueResourceLimitsInfo = new QueueResourceLimitsInfo(); - + private volatile ResourceLimits cachedResourceLimitsForHeadroom = null; private OrderingPolicy orderingPolicy = new FifoOrderingPolicy(); + + // record all ignore partition exclusivityRMContainer, this will be used to do + // preemption, key is the partition of the RMContainer allocated on + private Map> ignorePartitionExclusivityRMContainers = + new HashMap<>(); public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -921,11 +927,16 @@ public class LeafQueue extends AbstractCSQueue { Resource assigned = assignment.getResource(); if (Resources.greaterThan( resourceCalculator, clusterResource, assigned, Resources.none())) { + // Get reserved or allocated container from application + RMContainer reservedOrAllocatedRMContainer = + application.getRMContainer(assignment + .getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId()); // Book-keeping // Note: Update headroom to account for current allocation too... allocateResource(clusterResource, application, assigned, - node.getPartition()); + node.getPartition(), reservedOrAllocatedRMContainer); // Don't reset scheduling opportunities for offswitch assignments // otherwise the app will be delayed for each non-local assignment. @@ -1720,7 +1731,7 @@ public class LeafQueue extends AbstractCSQueue { orderingPolicy.containerReleased(application, rmContainer); releaseResource(clusterResource, application, - container.getResource(), node.getPartition()); + container.getResource(), node.getPartition(), rmContainer); LOG.info("completedContainer" + " container=" + container + " queue=" + this + @@ -1738,9 +1749,22 @@ public class LeafQueue extends AbstractCSQueue { synchronized void allocateResource(Resource clusterResource, SchedulerApplicationAttempt application, Resource resource, - String nodePartition) { + String nodePartition, RMContainer rmContainer) { super.allocateResource(clusterResource, resource, nodePartition); + // handle ignore exclusivity container + if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( + RMNodeLabelsManager.NO_LABEL) + && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { + TreeSet rmContainers = null; + if (null == (rmContainers = + ignorePartitionExclusivityRMContainers.get(nodePartition))) { + rmContainers = new TreeSet<>(); + ignorePartitionExclusivityRMContainers.put(nodePartition, rmContainers); + } + rmContainers.add(rmContainer); + } + // Update user metrics String userName = application.getUser(); User user = getUser(userName); @@ -1760,10 +1784,25 @@ public class LeafQueue extends AbstractCSQueue { } } - synchronized void releaseResource(Resource clusterResource, - FiCaSchedulerApp application, Resource resource, String nodePartition) { + synchronized void releaseResource(Resource clusterResource, + FiCaSchedulerApp application, Resource resource, String nodePartition, + RMContainer rmContainer) { super.releaseResource(clusterResource, resource, nodePartition); + // handle ignore exclusivity container + if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( + RMNodeLabelsManager.NO_LABEL) + && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) { + Set rmContainers = + ignorePartitionExclusivityRMContainers.get(nodePartition); + rmContainers.remove(rmContainer); + if (rmContainers.isEmpty()) { + ignorePartitionExclusivityRMContainers.remove(nodePartition); + } + } + } + // Update user metrics String userName = application.getUser(); User user = getUser(userName); @@ -1912,7 +1951,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, attempt, rmContainer.getContainer() - .getResource(), node.getPartition()); + .getResource(), node.getPartition(), rmContainer); } getParent().recoverContainer(clusterResource, attempt, rmContainer); } @@ -1953,7 +1992,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, application, rmContainer.getContainer() - .getResource(), node.getPartition()); + .getResource(), node.getPartition(), rmContainer); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() @@ -1971,7 +2010,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); releaseResource(clusterResource, application, rmContainer.getContainer() - .getResource(), node.getPartition()); + .getResource(), node.getPartition(), rmContainer); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() @@ -1982,6 +2021,17 @@ public class LeafQueue extends AbstractCSQueue { } } + /** + * return all ignored partition exclusivity RMContainers in the LeafQueue, this + * will be used by preemption policy, and use of return + * ignorePartitionExclusivityRMContainer should protected by LeafQueue + * synchronized lock + */ + public synchronized Map> + getIgnoreExclusivityRMContainers() { + return ignorePartitionExclusivityRMContainers; + } + public void setCapacity(float capacity) { queueCapacities.setCapacity(capacity); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java index c5c067d9e1d..51582553928 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java @@ -18,16 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Resource; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + @InterfaceAudience.Private @InterfaceStability.Unstable public class AssignmentInformation { @@ -117,4 +118,24 @@ public class AssignmentInformation { public List getReservationDetails() { return operationDetails.get(Operation.RESERVATION); } + + private ContainerId getFirstContainerIdFromOperation(Operation op) { + if (null != operationDetails.get(Operation.ALLOCATION)) { + List assignDetails = + operationDetails.get(Operation.ALLOCATION); + if (!assignDetails.isEmpty()) { + return assignDetails.get(0).containerId; + } + } + return null; + } + + public ContainerId getFirstAllocatedOrReservedContainerId() { + ContainerId containerId = null; + containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION); + if (null != containerId) { + return containerId; + } + return getFirstContainerIdFromOperation(Operation.RESERVATION); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 9e8b7698eb0..6c0ed6c4716 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -25,11 +25,12 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; @@ -37,27 +38,17 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Iterator; -import java.util.Map; import java.util.NavigableSet; import java.util.Random; -import java.util.Set; import java.util.StringTokenizer; import java.util.TreeSet; -import org.apache.commons.collections.map.HashedMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -76,23 +67,27 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; -import org.mortbay.log.Log; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestProportionalCapacityPreemptionPolicy { @@ -798,50 +793,6 @@ public class TestProportionalCapacityPreemptionPolicy { setAMContainer = false; } - @Test - public void testIdealAllocationForLabels() { - int[][] qData = new int[][] { - // / A B - { 80, 40, 40 }, // abs - { 80, 80, 80 }, // maxcap - { 80, 80, 0 }, // used - { 70, 20, 50 }, // pending - { 0, 0, 0 }, // reserved - { 5, 4, 1 }, // apps - { -1, 1, 1 }, // req granularity - { 2, 0, 0 }, // subqueues - }; - setAMContainer = true; - setLabeledContainer = true; - Map> labels = new HashMap>(); - NodeId node = NodeId.newInstance("node1", 0); - Set labelSet = new HashSet(); - labelSet.add("x"); - labels.put(node, labelSet); - when(lm.getNodeLabels()).thenReturn(labels); - ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); - // Subtracting Label X resources from cluster resources - when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn( - Resources.clone(Resource.newInstance(80, 0))); - clusterResources.setMemory(100); - policy.editSchedule(); - - // By skipping AM Container and Labeled container, all other 18 containers - // of appD will be - // preempted - verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appD))); - - // By skipping AM Container and Labeled container, all other 18 containers - // of appC will be - // preempted - verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appC))); - - // rest 4 containers from appB will be preempted - verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB))); - setAMContainer = false; - setLabeledContainer = false; - } - @Test public void testPreemptSkippedAMContainers() { int[][] qData = new int[][] { @@ -944,6 +895,12 @@ public class TestProportionalCapacityPreemptionPolicy { clusterResources = Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); when(mCS.getClusterResource()).thenReturn(clusterResources); + when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn( + clusterResources); + + SchedulerNode mNode = mock(SchedulerNode.class); + when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL); + when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode); return policy; } @@ -965,11 +922,16 @@ public class TestProportionalCapacityPreemptionPolicy { float tot = leafAbsCapacities(abs, queues); Deque pqs = new LinkedList(); ParentQueue root = mockParentQueue(null, queues[0], pqs); - when(root.getQueueName()).thenReturn("/"); + when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT); when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot); when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot); when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot); - when(root.getQueuePath()).thenReturn("root"); + QueueCapacities rootQc = new QueueCapacities(true); + rootQc.setAbsoluteUsedCapacity(used[0] / tot); + rootQc.setAbsoluteCapacity(abs[0] / tot); + rootQc.setAbsoluteMaximumCapacity(maxCap[0] / tot); + when(root.getQueueCapacities()).thenReturn(rootQc); + when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT); boolean preemptionDisabled = mockPreemptionStatus("root"); when(root.getPreemptionDisabled()).thenReturn(preemptionDisabled); @@ -987,6 +949,14 @@ public class TestProportionalCapacityPreemptionPolicy { when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); + + // We need to make these fields to QueueCapacities + QueueCapacities qc = new QueueCapacities(false); + qc.setAbsoluteUsedCapacity(used[i] / tot); + qc.setAbsoluteCapacity(abs[i] / tot); + qc.setAbsoluteMaximumCapacity(maxCap[i] / tot); + when(q.getQueueCapacities()).thenReturn(qc); + String parentPathName = p.getQueuePath(); parentPathName = (parentPathName == null) ? "root" : parentPathName; String queuePathName = (parentPathName+"."+queueName).replace("/","root"); @@ -1028,6 +998,7 @@ public class TestProportionalCapacityPreemptionPolicy { return pq; } + @SuppressWarnings("rawtypes") LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { LeafQueue lq = mock(LeafQueue.class); @@ -1035,6 +1006,10 @@ public class TestProportionalCapacityPreemptionPolicy { new ArrayList(); when(lq.getTotalResourcePending()).thenReturn( Resource.newInstance(pending[i], 0)); + // need to set pending resource in resource usage as well + ResourceUsage ru = new ResourceUsage(); + ru.setPending(Resource.newInstance(pending[i], 0)); + when(lq.getQueueResourceUsage()).thenReturn(ru); // consider moving where CapacityScheduler::comparator accessible final NavigableSet qApps = new TreeSet( new Comparator() { @@ -1124,6 +1099,7 @@ public class TestProportionalCapacityPreemptionPolicy { when(mC.getContainerId()).thenReturn(cId); when(mC.getContainer()).thenReturn(c); when(mC.getApplicationAttemptId()).thenReturn(appAttId); + when(mC.getAllocatedResource()).thenReturn(r); if (priority.AMCONTAINER.getValue() == cpriority) { when(mC.isAMContainer()).thenReturn(true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java new file mode 100644 index 00000000000..e13320cc1d4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -0,0 +1,1211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +public class TestProportionalCapacityPreemptionPolicyForNodePartitions { + private static final Log LOG = + LogFactory.getLog(TestProportionalCapacityPreemptionPolicyForNodePartitions.class); + static final String ROOT = CapacitySchedulerConfiguration.ROOT; + + private Map nameToCSQueues = null; + private Map partitionToResource = null; + private Map nodeIdToSchedulerNodes = null; + private RMNodeLabelsManager nlm = null; + private RMContext rmContext = null; + + private ResourceCalculator rc = new DefaultResourceCalculator(); + private Clock mClock = null; + private Configuration conf = null; + private CapacitySchedulerConfiguration csConf = null; + private CapacityScheduler cs = null; + private EventHandler mDisp = null; + private ProportionalCapacityPreemptionPolicy policy = null; + private Resource clusterResource = null; + + @SuppressWarnings("unchecked") + @Before + public void setup() { + org.apache.log4j.Logger.getRootLogger().setLevel( + org.apache.log4j.Level.DEBUG); + + conf = new Configuration(false); + conf.setLong(WAIT_TIME_BEFORE_KILL, 10000); + conf.setLong(MONITORING_INTERVAL, 3000); + // report "ideal" preempt + conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0); + conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0); + conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + ProportionalCapacityPreemptionPolicy.class.getCanonicalName()); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + // FairScheduler doesn't support this test, + // Set CapacityScheduler as the scheduler for this test. + conf.set("yarn.resourcemanager.scheduler.class", + CapacityScheduler.class.getName()); + + mClock = mock(Clock.class); + cs = mock(CapacityScheduler.class); + when(cs.getResourceCalculator()).thenReturn(rc); + + nlm = mock(RMNodeLabelsManager.class); + mDisp = mock(EventHandler.class); + + rmContext = mock(RMContext.class); + when(rmContext.getNodeLabelManager()).thenReturn(nlm); + csConf = new CapacitySchedulerConfiguration(); + when(cs.getConfiguration()).thenReturn(csConf); + when(cs.getRMContext()).thenReturn(rmContext); + + policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock); + partitionToResource = new HashMap<>(); + nodeIdToSchedulerNodes = new HashMap<>(); + nameToCSQueues = new HashMap<>(); + } + + @Test + public void testBuilder() throws Exception { + /** + * Test of test, make sure we build expected mock schedulable objects + */ + String labelsConfig = + "=200,true;" + // default partition + "red=100,false;" + // partition=red + "blue=200,true"; // partition=blue + String nodesConfig = + "n1=red;" + // n1 has partition=red + "n2=blue;" + // n2 has partition=blue + "n3="; // n3 doesn't have partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root + "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a + "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1 + "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2 + "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])"; + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated) + "a1\t" // app1 in a1 + + "(1,1,n3,red,50,false);" + // 50 * default in n3 + + "a1\t" // app2 in a1 + + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved), + // 50 * ignore-exclusivity (allocated) + + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved), + // 50 in n2 (allocated) + "a2\t" // app3 in a2 + + "(1,1,n3,red,50,false);" + // 50 * default in n3 + + "b\t" // app4 in b + + "(1,1,n1,red,100,false);"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + + // Check queues: + // root + checkAbsCapacities(cs.getQueue("root"), "", 1f, 1f, 0.5f); + checkPendingResource(cs.getQueue("root"), "", 100); + checkAbsCapacities(cs.getQueue("root"), "red", 1f, 1f, 1f); + checkPendingResource(cs.getQueue("root"), "red", 100); + checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f); + checkPendingResource(cs.getQueue("root"), "blue", 200); + + // a + checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f); + checkPendingResource(cs.getQueue("a"), "", 100); + checkAbsCapacities(cs.getQueue("a"), "red", 0f, 0f, 0f); + checkPendingResource(cs.getQueue("a"), "red", 0); + checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f); + checkPendingResource(cs.getQueue("a"), "blue", 200); + + // a1 + checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f); + checkPendingResource(cs.getQueue("a1"), "", 100); + checkAbsCapacities(cs.getQueue("a1"), "red", 0f, 0f, 0f); + checkPendingResource(cs.getQueue("a1"), "red", 0); + checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f); + checkPendingResource(cs.getQueue("a1"), "blue", 0); + + // a2 + checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f); + checkPendingResource(cs.getQueue("a2"), "", 0); + checkAbsCapacities(cs.getQueue("a2"), "red", 0f, 0f, 0f); + checkPendingResource(cs.getQueue("a2"), "red", 0); + checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f); + checkPendingResource(cs.getQueue("a2"), "blue", 200); + + // b1 + checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f); + checkPendingResource(cs.getQueue("b"), "", 0); + checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f); + checkPendingResource(cs.getQueue("b"), "red", 100); + checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f); + checkPendingResource(cs.getQueue("b"), "blue", 0); + + // Check ignored partitioned containers in queue + Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1")) + .getIgnoreExclusivityRMContainers().get("blue").size()); + + // Check applications + Assert.assertEquals(2, ((LeafQueue)cs.getQueue("a1")).getApplications().size()); + Assert.assertEquals(1, ((LeafQueue)cs.getQueue("a2")).getApplications().size()); + Assert.assertEquals(1, ((LeafQueue)cs.getQueue("b")).getApplications().size()); + + // Check #containers + FiCaSchedulerApp app1 = getApp("a1", 1); + FiCaSchedulerApp app2 = getApp("a1", 2); + FiCaSchedulerApp app3 = getApp("a2", 3); + FiCaSchedulerApp app4 = getApp("b", 4); + + Assert.assertEquals(50, app1.getLiveContainers().size()); + checkContainerNodesInApp(app1, 50, "n3"); + + Assert.assertEquals(50, app2.getLiveContainers().size()); + Assert.assertEquals(150, app2.getReservedContainers().size()); + checkContainerNodesInApp(app2, 200, "n2"); + + Assert.assertEquals(50, app3.getLiveContainers().size()); + checkContainerNodesInApp(app3, 50, "n3"); + + Assert.assertEquals(100, app4.getLiveContainers().size()); + checkContainerNodesInApp(app4, 100, "n1"); + } + + @Test + public void testNodePartitionPreemptionRespectGuaranteedCapacity() + throws IOException { + /** + * The simplest test of node label, Queue structure is: + * + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * + * Both a/b can access x, and guaranteed capacity of them is 50:50. Two + * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster, + * app1/app2 in a, and app3/app4 in b. + * app1 uses 80 x, app2 uses 20 NO_LABEL, app3 uses 20 x, app4 uses 80 NO_LABEL. + * Both a/b have 50 pending resource for x and NO_LABEL + * + * After preemption, it should preempt 30 from app1, and 30 from app4. + */ + String labelsConfig = + "=100,true;" + // default partition + "x=100,true"; // partition=x + String nodesConfig = + "n1=x;" + // n1 has partition=x + "n2="; // n2 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100],x=[100 100 100 100]);" + //root + "-a(=[50 100 20 50],x=[50 100 80 50]);" + // a + "-b(=[50 100 80 50],x=[50 100 20 50])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,x,80,false);" + // 80 * x in n1 + "a\t" // app2 in a + + "(1,1,n2,,20,false);" + // 20 default in n2 + "b\t" // app3 in b + + "(1,1,n1,x,20,false);" + // 80 * x in n1 + "b\t" // app4 in b + + "(1,1,n2,,80,false)"; // 20 default in n2 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 30 preempted from app1, 30 preempted from app4, and nothing preempted + // from app2/app3 + verify(mDisp, times(30)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + verify(mDisp, times(30)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(4)))); + verify(mDisp, never()).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + verify(mDisp, never()).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(3)))); + } + + @Test + public void testNodePartitionPreemptionRespectMaximumCapacity() + throws IOException { + /** + * Queue structure is: + * + *
+     *         root
+     *       /  |  \
+     *      a   b   c
+     * 
+ * + * Both a/b/c can access x, and guaranteed_capacity(x) of them is 80:10:10. + * a/b's max resource is 100, and c's max resource is 30. + * + * Two nodes, n1 has 100 x, n2 has 100 NO_LABEL. + * + * 2 apps in cluster. + * app1 in b and app2 in c. + * + * app1 uses 90x, and app2 use 10x. After preemption, app2 will preempt 10x + * from app1 because of max capacity. + */ + String labelsConfig = + "=100,true;" + // default partition + "x=100,true"; // partition=x + String nodesConfig = + "n1=x;" + // n1 has partition=x + "n2="; // n2 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100],x=[100 100 100 100]);" + //root + "-a(=[80 80 0 0],x=[80 80 0 0]);" + // a + "-b(=[10 100 0 0],x=[10 100 90 50]);" + // b + "-c(=[10 100 0 0],x=[10 30 10 50])"; //c + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "b\t" // app1 in b + + "(1,1,n1,x,90,false);" + // 80 * x in n1 + "c\t" // app2 in c + + "(1,1,n1,x,10,false)"; // 20 default in n2 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 30 preempted from app1, 30 preempted from app4, and nothing preempted + // from app2/app3 + verify(mDisp, times(20)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + verify(mDisp, never()).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + } + + @Test + public void testNodePartitionPreemptionOfIgnoreExclusivityAndRespectCapacity() + throws IOException { + /** + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * + * Both a/b can access x, and guaranteed capacity of them is 50:50. Two + * nodes, n1 has 100 x, n2 has 100 NO_LABEL and 2 applications in the cluster, + * app1/app2 in a + * app1 uses 20x (ignoreExclusivity), app2 uses 80x (respectExclusivity). + * + * b has 100 pending resource of x + * + * After preemption, it should preempt 20 from app1, and 30 from app2. + */ + String labelsConfig = + "=100,true;" + // default partition + "x=100,false"; // partition=x + String nodesConfig = + "n1=x;" + // n1 has partition=x + "n2="; // n2 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100],x=[100 100 100 100]);" + //root + "-a(=[50 100 0 0],x=[50 100 100 50]);" + // a + "-b(=[50 100 0 0],x=[50 100 0 100])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,x,1,false)" // 1 * x in n1 (it's AM container) + + "(1,1,n1,,20,false);" + // 20 * x in n1 (ignoreExclusivity) + "a\t" // app2 in a + + "(1,1,n1,x,79,false)"; // 79 * x + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 30 preempted from app1, 30 preempted from app4, and nothing preempted + // from app2/app3 + verify(mDisp, times(20)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + verify(mDisp, times(30)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + } + + @Test + public void testNodePartitionPreemptionOfSkippingAMContainer() + throws IOException { + /** + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * + * Both a/b can access x, and guaranteed capacity of them is 20:80. Two + * nodes, n1 has 100 x, n2 has 100 NO_LABEL and 2 applications in the cluster, + * app1/app2/app3/app4/app5 in a, both uses 20 resources. + * + * b has 100 pending resource of x + * + * After preemption, it should preempt 19 from app[5-2] an 4 from app1 + */ + String labelsConfig = + "=100,true;" + // default partition + "x=100,true"; // partition=x + String nodesConfig = + "n1=x;" + // n1 has partition=x + "n2="; // n2 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100],x=[100 100 100 100]);" + //root + "-a(=[50 100 0 0],x=[20 100 100 50]);" + // a + "-b(=[50 100 0 0],x=[80 100 0 100])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,x,20,false);" + // uses 20 resource + "a\t" // app2 in a + + "(1,1,n1,x,20,false);" + // uses 20 resource + "a\t" // app3 in a + + "(1,1,n1,x,20,false);" + // uses 20 resource + "a\t" // app4 in a + + "(1,1,n1,x,20,false);" + // uses 20 resource + "a\t" // app5 in a + + "(1,1,n1,x,20,false);"; // uses 20 resource + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 4 from app1 + verify(mDisp, times(4)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + // 19 from app2-app5 + verify(mDisp, times(19)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + verify(mDisp, times(19)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(3)))); + verify(mDisp, times(19)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(4)))); + verify(mDisp, times(19)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(5)))); + } + + @Test + public void testNodePartitionPreemptionOfAMContainer() + throws IOException { + /** + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * + * Both a/b can access x, and guaranteed capacity of them is 3:97. Two + * nodes, n1 has 100 x, n2 has 100 NO_LABEL. + * + * app1/app2/app3/app4/app5 in a, both uses 20 resources(x) + * + * b has 100 pending resource of x + * + * After preemption, it should preempt 20 from app4/app5 an 19 from + * app1-app3. App4/app5's AM container will be preempted + */ + String labelsConfig = + "=100,true;" + // default partition + "x=100,true"; // partition=x + String nodesConfig = + "n1=x;" + // n1 has partition=x + "n2="; // n2 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100],x=[100 100 100 100]);" + //root + "-a(=[50 100 0 0],x=[3 100 100 50]);" + // a + "-b(=[50 100 0 0],x=[97 100 0 100])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,x,20,false);" + // uses 20 resource + "a\t" // app2 in a + + "(1,1,n1,x,20,false);" + // uses 20 resource + "a\t" // app3 in a + + "(1,1,n1,x,20,false);" + // uses 20 resource + "a\t" // app4 in a + + "(1,1,n1,x,20,false);" + // uses 20 resource + "a\t" // app5 in a + + "(1,1,n1,x,20,false);"; // uses 20 resource + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 4 from app1 + verify(mDisp, times(19)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + // 19 from app2-app5 + verify(mDisp, times(19)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + verify(mDisp, times(19)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(3)))); + verify(mDisp, times(20)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(4)))); + verify(mDisp, times(20)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(5)))); + } + + @Test + public void testNodePartitionDisablePreemptionForSingleLevelQueue() + throws IOException { + /** + * Queue structure is: + * + *
+     *         root
+     *       /  |  \
+     *      a   b   c
+     * 
+ * + * Both a/b/c can access x, and guaranteed_capacity(x) of them is 40:20:40. + * a/b/c's max resource is 100. b is disable-preemption + * + * Two nodes, n1 has 100 x, n2 has 100 NO_LABEL. + * + * 2 apps in cluster. app1 in a (usage=50), app2 in b(usage=30), app3 in + * c(usage=20). All of them have 50 pending resource. + * + * After preemption, app1 will be preempt 10 containers and app2 will not be + * preempted + */ + String labelsConfig = + "=100,true;" + // default partition + "x=100,true"; // partition=x + String nodesConfig = + "n1=x;" + // n1 has partition=x + "n2="; // n2 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100],x=[100 100 100 100]);" + //root + "-a(=[80 80 0 0],x=[40 100 50 50]);" + // a + "-b(=[10 100 0 0],x=[20 100 30 0]);" + // b + "-c(=[10 100 0 0],x=[40 100 20 50])"; //c + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,x,50,false);" + // 50x in n1 + "b\t" // app2 in b + + "(1,1,n1,x,30,false);" + // 30x in n1 + "c\t" // app3 in c + + "(1,1,n1,x,20,false)"; // 20x in n1 + + csConf.setPreemptionDisabled("root.b", true); + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 10 preempted from app1, nothing preempted from app2-app3 + verify(mDisp, times(10)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + verify(mDisp, never()).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + verify(mDisp, never()).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(3)))); + } + + @Test + public void testNodePartitionNonAccessibleQueuesSharePartitionedResource() + throws IOException { + /** + * Queue structure is: + * + *
+     *           root
+     *        _________
+     *       /  |   |  \
+     *      a   b   c   d
+     * 
+ * + * a/b can access x, their capacity is 50:50 c/d cannot access x. + * + * a uses 0, wants 30 + * b(app1) uses 30, wants 0 + * c(app2)&d(app3) use 35, wants 50 + * + * After preemption, c/d will be preempted 15 containers, because idle + * resource = 100 - 30 (which is used by b) - 30 (which is asked by a) = 40 + * will be divided by c/d, so each of c/d get 20. + */ + String labelsConfig = + "=100,true;" + // default partition + "x=100,false"; // partition=x + String nodesConfig = + "n1=x;" + // n1 has partition=x + "n2="; // n2 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100],x=[100 100 100 100]);" + //root + "-a(=[25 100 0 0],x=[50 100 0 30]);" + // a + "-b(=[25 100 0 0],x=[50 100 30 0]);" + // b + "-c(=[25 100 1 0],x=[0 0 35 50]);" + //c + "-d(=[25 100 1 0],x=[0 0 35 50])"; //d + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "b\t" // app1 in b + + "(1,1,n1,x,30,false);" + // 50x in n1 + "c\t" // app2 in c + + "(1,1,n2,,1,false)" // AM container (in n2) + + "(1,1,n1,,30,false);" + // 30x in n1 (ignore exclusivity) + "d\t" // app3 in d + + "(1,1,n2,,1,false)" // AM container (in n2) + + "(1,1,n1,,30,false)"; // 30x in n1 (ignore exclusivity) + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 15 will be preempted app2/app3 + verify(mDisp, times(15)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + verify(mDisp, times(15)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(3)))); + verify(mDisp, never()).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + } + + @Test + public void testHierarchyPreemptionForMultiplePartitions() + throws IOException { + /** + * Queue structure is: + * + *
+     *           root
+     *           /  \
+     *          a    b
+     *        /  \  /  \
+     *       a1  a2 b1  b2
+     * 
+ * + * Both a/b can access x/y, and in all hierarchy capacity ratio is 50:50. + * So for a1/a2/b1/b2, all of them can access 25x, 25y + * + * a1 uses 35x, 25y + * a2 uses 25x, 15y + * b1 uses 15x, 25y + * b2 uses 25x 35y + * + * So as a result, a2 will preempt from b2, and b1 will preempt from a1. + * + * After preemption, a1 will be preempted 10x and b2 will be preempted 10y. + */ + String labelsConfig = + "=100,true;" + // default partition + "x=100,true;" + // partition=x + "y=100,true"; // partition=y + String nodesConfig = + "n1=x;" + // n1 has partition=x + "n2=y;" + // n2 has partition=y + "n3="; // n3 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 0 0],x=[100 100 100 100],y=[100 100 100 100]);" + //root + "-a(=[50 100 0 0],x=[50 100 60 40],y=[50 100 40 40]);" + // a + "--a1(=[25 100 0 0],x=[25 100 35 20],y=[25 100 25 20]);" + // a1 + "--a2(=[25 100 0 0],x=[25 100 25 20],y=[25 100 15 20]);" + // a2 + "-b(=[50 100 0 0],x=[50 100 40 40],y=[50 100 60 40]);" + // b + "--b1(=[25 100 0 0],x=[25 100 15 20],y=[25 100 25 20]);" + // b1 + "--b2(=[25 100 0 0],x=[25 100 25 20],y=[25 100 35 20])"; // b2 + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a1\t" // app1 in a1 + + "(1,1,n1,x,35,false)" // 35 of x + + "(1,1,n2,y,25,false);" + // 25 of y + "a2\t" // app2 in a2 + + "(1,1,n1,x,25,false)" // 25 of x + + "(1,1,n2,y,15,false);" + // 15 of y + "b1\t" // app3 in b1 + + "(1,1,n1,x,15,false)" // 15 of x + + "(1,1,n2,y,25,false);" + // 25 of y + "b2\t" // app4 in b2 + + "(1,1,n1,x,25,false)" // 25 of x + + "(1,1,n2,y,35,false)"; // 35 of y + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 10 will be preempted from app1 (a1) /app4 (b2) + verify(mDisp, times(10)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + verify(mDisp, times(10)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(4)))); + verify(mDisp, never()).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + verify(mDisp, never()).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(3)))); + } + + @Test + public void testHierarchyPreemptionForDifferenceAcessibility() + throws IOException { + /** + * Queue structure is: + * + *
+     *           root
+     *           /  \
+     *          a    b
+     *        /  \  /  \
+     *       a1  a2 b1  b2
+     * 
+ * + * a can access x only and b can access y only + * + * Capacities of a1/a2, b1/b2 is 50:50 + * + * a1 uses 100x and b1 uses 80y + * + * So as a result, a1 will be preempted 50 containers and b1 will be + * preempted 30 containers + */ + String labelsConfig = + "=100,true;" + // default partition + "x=100,true;" + // partition=x + "y=100,true"; // partition=y + String nodesConfig = + "n1=x;" + // n1 has partition=x + "n2=y;" + // n2 has partition=y + "n3="; // n3 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 0 0],x=[100 100 100 100],y=[100 100 100 100]);" + //root + "-a(=[50 100 0 0],x=[100 100 100 100]);" + // a + "--a1(=[25 100 0 0],x=[50 100 100 0]);" + // a1 + "--a2(=[25 100 0 0],x=[50 100 0 100]);" + // a2 + "-b(=[50 100 0 0],y=[100 100 80 100]);" + // b + "--b1(=[25 100 0 0],y=[50 100 80 0]);" + // b1 + "--b2(=[25 100 0 0],y=[50 100 0 100])"; // b2 + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a1\t" // app1 in a1 + + "(1,1,n1,x,100,false);" + // 100 of x + "b1\t" // app2 in b1 + + "(1,1,n2,y,80,false)"; // 80 of y + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(50)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + verify(mDisp, times(30)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + } + + + private ApplicationAttemptId getAppAttemptId(int id) { + ApplicationId appId = ApplicationId.newInstance(0L, id); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + return appAttemptId; + } + + private void checkContainerNodesInApp(FiCaSchedulerApp app, + int expectedContainersNumber, String host) { + NodeId nodeId = NodeId.newInstance(host, 1); + int num = 0; + for (RMContainer c : app.getLiveContainers()) { + if (c.getAllocatedNode().equals(nodeId)) { + num++; + } + } + for (RMContainer c : app.getReservedContainers()) { + if (c.getAllocatedNode().equals(nodeId)) { + num++; + } + } + Assert.assertEquals(expectedContainersNumber, num); + } + + private FiCaSchedulerApp getApp(String queueName, int appId) { + for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName)) + .getApplications()) { + if (app.getApplicationId().getId() == appId) { + return app; + } + } + return null; + } + + private void checkAbsCapacities(CSQueue queue, String partition, + float guaranteed, float max, float used) { + QueueCapacities qc = queue.getQueueCapacities(); + Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition), 1e-3); + Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition), 1e-3); + Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition), 1e-3); + } + + private void checkPendingResource(CSQueue queue, String partition, int pending) { + ResourceUsage ru = queue.getQueueResourceUsage(); + Assert.assertEquals(pending, ru.getPending(partition).getMemory()); + } + + private void buildEnv(String labelsConfig, String nodesConfig, + String queuesConfig, String appsConfig) throws IOException { + mockNodeLabelsManager(labelsConfig); + mockSchedulerNodes(nodesConfig); + for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) { + when(cs.getSchedulerNode(nodeId)).thenReturn( + nodeIdToSchedulerNodes.get(nodeId)); + } + ParentQueue root = mockQueueHierarchy(queuesConfig); + when(cs.getRootQueue()).thenReturn(root); + when(cs.getClusterResource()).thenReturn(clusterResource); + mockApplications(appsConfig); + + policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock); + } + + private void mockContainers(String containersConfig, ApplicationAttemptId attemptId, + String queueName, List reservedContainers, + List liveContainers) { + int containerId = 1; + int start = containersConfig.indexOf("=") + 1; + int end = -1; + + while (start < containersConfig.length()) { + while (start < containersConfig.length() + && containersConfig.charAt(start) != '(') { + start++; + } + if (start >= containersConfig.length()) { + throw new IllegalArgumentException( + "Error containers specification, line=" + containersConfig); + } + end = start + 1; + while (end < containersConfig.length() + && containersConfig.charAt(end) != ')') { + end++; + } + if (end >= containersConfig.length()) { + throw new IllegalArgumentException( + "Error containers specification, line=" + containersConfig); + } + + // now we found start/end, get container values + String[] values = containersConfig.substring(start + 1, end).split(","); + if (values.length != 6) { + throw new IllegalArgumentException("Format to define container is:" + + "(priority,resource,host,expression,repeat,reserved)"); + } + Priority pri = Priority.newInstance(Integer.valueOf(values[0])); + Resource res = Resources.createResource(Integer.valueOf(values[1])); + NodeId host = NodeId.newInstance(values[2], 1); + String exp = values[3]; + int repeat = Integer.valueOf(values[4]); + boolean reserved = Boolean.valueOf(values[5]); + + for (int i = 0; i < repeat; i++) { + Container c = mock(Container.class); + when(c.getResource()).thenReturn(res); + when(c.getPriority()).thenReturn(pri); + RMContainerImpl rmc = mock(RMContainerImpl.class); + when(rmc.getAllocatedNode()).thenReturn(host); + when(rmc.getNodeLabelExpression()).thenReturn(exp); + when(rmc.getAllocatedResource()).thenReturn(res); + when(rmc.getContainer()).thenReturn(c); + when(rmc.getApplicationAttemptId()).thenReturn(attemptId); + final ContainerId cId = ContainerId.newContainerId(attemptId, containerId); + when(rmc.getContainerId()).thenReturn( + cId); + doAnswer(new Answer() { + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + return cId.compareTo(((RMContainer) invocation.getArguments()[0]) + .getContainerId()); + } + }).when(rmc).compareTo(any(RMContainer.class)); + + if (containerId == 1) { + when(rmc.isAMContainer()).thenReturn(true); + } + + if (reserved) { + reservedContainers.add(rmc); + } else { + liveContainers.add(rmc); + } + + // If this is a non-exclusive allocation + String partition = null; + if (exp.isEmpty() + && !(partition = nodeIdToSchedulerNodes.get(host).getPartition()) + .isEmpty()) { + LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); + Map> ignoreExclusivityContainers = + queue.getIgnoreExclusivityRMContainers(); + if (!ignoreExclusivityContainers.containsKey(partition)) { + ignoreExclusivityContainers.put(partition, + new TreeSet()); + } + ignoreExclusivityContainers.get(partition).add(rmc); + } + LOG.debug("add container to app=" + attemptId + " res=" + res + + " node=" + host + " nodeLabelExpression=" + exp + " partition=" + + partition); + + containerId++; + } + + start = end + 1; + } + } + + /** + * Format is: + *
+   * queueName\t  // app1
+   * (priority,resource,host,expression,#repeat,reserved)
+   * (priority,resource,host,expression,#repeat,reserved);
+   * queueName\t  // app2
+   * 
+ */ + private void mockApplications(String appsConfig) { + int id = 1; + for (String a : appsConfig.split(";")) { + String[] strs = a.split("\t"); + String queueName = strs[0]; + + // get containers + List liveContainers = new ArrayList(); + List reservedContainers = new ArrayList(); + ApplicationId appId = ApplicationId.newInstance(0L, id); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + + mockContainers(strs[1], appAttemptId, queueName, reservedContainers, + liveContainers); + + FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); + when(app.getLiveContainers()).thenReturn(liveContainers); + when(app.getReservedContainers()).thenReturn(reservedContainers); + when(app.getApplicationAttemptId()).thenReturn(appAttemptId); + when(app.getApplicationId()).thenReturn(appId); + + // add to LeafQueue + LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); + queue.getApplications().add(app); + + id++; + } + } + + /** + * Format is: + * host1=partition; + * host2=partition; + */ + private void mockSchedulerNodes(String schedulerNodesConfigStr) + throws IOException { + String[] nodesConfigStrArray = schedulerNodesConfigStr.split(";"); + for (String p : nodesConfigStrArray) { + NodeId nodeId = NodeId.newInstance(p.substring(0, p.indexOf("=")), 1); + String partition = p.substring(p.indexOf("=") + 1, p.length()); + + SchedulerNode sn = mock(SchedulerNode.class); + when(sn.getNodeID()).thenReturn(nodeId); + when(sn.getPartition()).thenReturn(partition); + nodeIdToSchedulerNodes.put(nodeId, sn); + + LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition); + } + } + + /** + * Format is: + *
+   * partition0=total_resource,exclusivity;
+   * partition1=total_resource,exclusivity;
+   * ...
+   * 
+ */ + private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOException { + String[] partitionConfigArr = nodeLabelsConfigStr.split(";"); + clusterResource = Resources.createResource(0); + for (String p : partitionConfigArr) { + String partitionName = p.substring(0, p.indexOf("=")); + int totalResource = + Integer.valueOf(p.substring(p.indexOf("=") + 1, p.indexOf(","))); + boolean exclusivity = + Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length())); + Resource res = Resources.createResource(totalResource); + when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class))) + .thenReturn(res); + when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity); + + // add to partition to resource + partitionToResource.put(partitionName, res); + LOG.debug("add partition=" + partitionName + " totalRes=" + res + + " exclusivity=" + exclusivity); + Resources.addTo(clusterResource, res); + } + + when(nlm.getClusterNodeLabelNames()).thenReturn( + partitionToResource.keySet()); + } + + /** + * Format is: + *
+   * root (=[guaranteed max used pending],=..);
+   * -A(...);
+   * --A1(...);
+   * --A2(...);
+   * -B...
+   * 
+ * ";" splits queues, and there should no empty lines, no extra spaces + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + private ParentQueue mockQueueHierarchy(String queueExprs) { + String[] queueExprArray = queueExprs.split(";"); + ParentQueue rootQueue = null; + for (int idx = 0; idx < queueExprArray.length; idx++) { + String q = queueExprArray[idx]; + CSQueue queue; + + // Initialize queue + if (isParent(queueExprArray, idx)) { + ParentQueue parentQueue = mock(ParentQueue.class); + queue = parentQueue; + List children = new ArrayList(); + when(parentQueue.getChildQueues()).thenReturn(children); + } else { + LeafQueue leafQueue = mock(LeafQueue.class); + final TreeSet apps = + new TreeSet<>(CapacityScheduler.applicationComparator); + when(leafQueue.getApplications()).thenReturn(apps); + OrderingPolicy so = mock(OrderingPolicy.class); + when(so.getPreemptionIterator()).thenAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + return apps.descendingIterator(); + } + }); + when(leafQueue.getOrderingPolicy()).thenReturn(so); + + Map> ignorePartitionContainers = + new HashMap<>(); + when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn( + ignorePartitionContainers); + queue = leafQueue; + } + + setupQueue(queue, q, queueExprArray, idx); + if (queue.getQueueName().equals(ROOT)) { + rootQueue = (ParentQueue) queue; + } + } + return rootQueue; + } + + private void setupQueue(CSQueue queue, String q, String[] queueExprArray, + int idx) { + LOG.debug("*** Setup queue, source=" + q); + String queuePath = null; + + int myLevel = getLevel(q); + if (0 == myLevel) { + // It's root + when(queue.getQueueName()).thenReturn(ROOT); + queuePath = ROOT; + } + + String queueName = getQueueName(q); + when(queue.getQueueName()).thenReturn(queueName); + + // Setup parent queue, and add myself to parentQueue.children-list + ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel); + if (null != parentQueue) { + when(queue.getParent()).thenReturn(parentQueue); + parentQueue.getChildQueues().add(queue); + + // Setup my path + queuePath = parentQueue.getQueuePath() + "." + queueName; + } + when(queue.getQueuePath()).thenReturn(queuePath); + + QueueCapacities qc = new QueueCapacities(0 == myLevel); + ResourceUsage ru = new ResourceUsage(); + + when(queue.getQueueCapacities()).thenReturn(qc); + when(queue.getQueueResourceUsage()).thenReturn(ru); + + LOG.debug("Setup queue, name=" + queue.getQueueName() + " path=" + + queue.getQueuePath()); + LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue + .getQueueName())); + + // Setup other fields like used resource, guaranteed resource, etc. + String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")")); + for (String s : capacitySettingStr.split(",")) { + String partitionName = s.substring(0, s.indexOf("=")); + String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" "); + // Add a small epsilon to capacities to avoid truncate when doing + // Resources.multiply + float epsilon = 1e-6f; + float absGuaranteed = + Integer.valueOf(values[0].trim()) + / (float) (partitionToResource.get(partitionName).getMemory()) + + epsilon; + float absMax = + Integer.valueOf(values[1].trim()) + / (float) (partitionToResource.get(partitionName).getMemory()) + + epsilon; + float absUsed = + Integer.valueOf(values[2].trim()) + / (float) (partitionToResource.get(partitionName).getMemory()) + + epsilon; + Resource pending = Resources.createResource(Integer.valueOf(values[3].trim())); + qc.setAbsoluteCapacity(partitionName, absGuaranteed); + qc.setAbsoluteMaximumCapacity(partitionName, absMax); + qc.setAbsoluteUsedCapacity(partitionName, absUsed); + ru.setPending(partitionName, pending); + LOG.debug("Setup queue=" + queueName + " partition=" + partitionName + + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax + + ",abs_used" + absUsed + ",pending_resource=" + pending + "]"); + } + + // Setup preemption disabled + when(queue.getPreemptionDisabled()).thenReturn( + csConf.getPreemptionDisabled(queuePath, false)); + + nameToCSQueues.put(queueName, queue); + when(cs.getQueue(eq(queueName))).thenReturn(queue); + } + + /** + * Level of a queue is how many "-" at beginning, root's level is 0 + */ + private int getLevel(String q) { + int level = 0; // level = how many "-" at beginning + while (level < q.length() && q.charAt(level) == '-') { + level++; + } + return level; + } + + private String getQueueName(String q) { + int idx = 0; + // find first != '-' char + while (idx < q.length() && q.charAt(idx) == '-') { + idx++; + } + if (idx == q.length()) { + throw new IllegalArgumentException("illegal input:" + q); + } + // name = after '-' and before '(' + String name = q.substring(idx, q.indexOf('(')); + if (name.isEmpty()) { + throw new IllegalArgumentException("queue name shouldn't be empty:" + q); + } + if (name.contains(".")) { + throw new IllegalArgumentException("queue name shouldn't contain '.':" + + name); + } + return name; + } + + private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) { + idx--; + while (idx >= 0) { + int level = getLevel(queueExprArray[idx]); + if (level < myLevel) { + String parentQueuName = getQueueName(queueExprArray[idx]); + return (ParentQueue) nameToCSQueues.get(parentQueuName); + } + idx--; + } + + return null; + } + + /** + * Get if a queue is ParentQueue + */ + private boolean isParent(String[] queues, int idx) { + int myLevel = getLevel(queues[idx]); + idx++; + while (idx < queues.length && getLevel(queues[idx]) == myLevel) { + idx++; + } + if (idx >= queues.length || getLevel(queues[idx]) < myLevel) { + // It's a LeafQueue + return false; + } else { + return true; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 2608dcb5d35..31661da8108 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -138,7 +138,7 @@ public class TestChildQueueOrder { } else { FiCaSchedulerApp app1 = getMockApplication(0, ""); ((LeafQueue)queue).allocateResource(clusterResource, app1, - allocatedResource, null); + allocatedResource, null, null); } // Next call - nothing diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 34248a47e27..1c8622fef6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -815,9 +815,9 @@ public class TestLeafQueue { qb.finishApplication(app_0.getApplicationId(), user_0); qb.finishApplication(app_2.getApplicationId(), user_1); qb.releaseResource(clusterResource, app_0, app_0.getResource(u0Priority), - null); + null, null); qb.releaseResource(clusterResource, app_2, app_2.getResource(u1Priority), - null); + null, null); qb.setUserLimit(50); qb.setUserLimitFactor(1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 46aa7ec484c..48d66020bf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppRepor import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.junit.Assert; import org.junit.Before; @@ -1015,6 +1017,20 @@ public class TestNodeLabelContainerAllocation { // app1 gets all resource in partition=x Assert.assertEquals(10, schedulerNode1.getNumContainers()); + // check non-exclusive containers of LeafQueue is correctly updated + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); + Assert.assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey( + "y")); + Assert.assertEquals(10, + leafQueue.getIgnoreExclusivityRMContainers().get("x").size()); + + // completes all containers of app1, ignoreExclusivityRMContainers should be + // updated as well. + cs.handle(new AppAttemptRemovedSchedulerEvent( + am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false)); + Assert.assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey( + "x")); + rm1.close(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index bdbd1687f35..4deaaaec6b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -150,7 +150,7 @@ public class TestParentQueue { } else { FiCaSchedulerApp app1 = getMockApplication(0, ""); ((LeafQueue)queue).allocateResource(clusterResource, app1, - allocatedResource, null); + allocatedResource, null, null); } // Next call - nothing