From 6a18ae849faa9becccd66b382296213a6c08842e Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Mon, 31 Oct 2016 15:18:31 -0700 Subject: [PATCH] YARN-2009. CapacityScheduler: Add intra-queue preemption for app priority support. (Sunil G via wangda) (cherry-picked from commit 90dd3a8148468ac37a3f2173ad8d45e38bfcb0c9) --- ...AbstractPreemptableResourceCalculator.java | 244 +++++ .../capacity/AbstractPreemptionEntity.java | 98 ++ .../CapacitySchedulerPreemptionContext.java | 14 + .../CapacitySchedulerPreemptionUtils.java | 119 ++- .../capacity/FifoCandidatesSelector.java | 129 +-- .../FifoIntraQueuePreemptionPlugin.java | 459 +++++++++ .../IntraQueueCandidatesSelector.java | 238 +++++ .../IntraQueuePreemptionComputePlugin.java | 39 + .../PreemptableResourceCalculator.java | 183 +--- .../PreemptionCandidatesSelector.java | 32 +- .../ProportionalCapacityPreemptionPolicy.java | 86 +- .../monitor/capacity/TempAppPerPartition.java | 101 ++ .../capacity/TempQueuePerPartition.java | 142 ++- .../CapacitySchedulerConfiguration.java | 31 + .../scheduler/capacity/LeafQueue.java | 40 +- .../common/fica/FiCaSchedulerApp.java | 18 + ...CapacityPreemptionPolicyMockFramework.java | 128 ++- ...nalCapacityPreemptionPolicyIntraQueue.java | 868 ++++++++++++++++++ 18 files changed, 2553 insertions(+), 416 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java new file mode 100644 index 00000000000..8255a30e800 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.PriorityQueue; + +/** + * Calculate how much resources need to be preempted for each queue, + * will be used by {@link PreemptionCandidatesSelector}. + */ +public class AbstractPreemptableResourceCalculator { + + protected final CapacitySchedulerPreemptionContext context; + protected final ResourceCalculator rc; + private boolean isReservedPreemptionCandidatesSelector; + + static class TQComparator implements Comparator { + private ResourceCalculator rc; + private Resource clusterRes; + + TQComparator(ResourceCalculator rc, Resource clusterRes) { + this.rc = rc; + this.clusterRes = clusterRes; + } + + @Override + public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { + if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { + return -1; + } + if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) { + return 1; + } + return 0; + } + + // Calculates idealAssigned / guaranteed + // TempQueues with 0 guarantees are always considered the most over + // capacity and therefore considered last for resources. + private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { + double pctOver = Integer.MAX_VALUE; + if (q != null && Resources.greaterThan(rc, clusterRes, q.getGuaranteed(), + Resources.none())) { + pctOver = Resources.divide(rc, clusterRes, q.idealAssigned, + q.getGuaranteed()); + } + return (pctOver); + } + } + + /** + * PreemptableResourceCalculator constructor. + * + * @param preemptionContext context + * @param isReservedPreemptionCandidatesSelector + * this will be set by different implementation of candidate + * selectors, please refer to TempQueuePerPartition#offer for + * details. + */ + public AbstractPreemptableResourceCalculator( + CapacitySchedulerPreemptionContext preemptionContext, + boolean isReservedPreemptionCandidatesSelector) { + context = preemptionContext; + rc = preemptionContext.getResourceCalculator(); + this.isReservedPreemptionCandidatesSelector = + isReservedPreemptionCandidatesSelector; + } + + /** + * Given a set of queues compute the fix-point distribution of unassigned + * resources among them. As pending request of a queue are exhausted, the + * queue is removed from the set and remaining capacity redistributed among + * remaining queues. The distribution is weighted based on guaranteed + * capacity, unless asked to ignoreGuarantee, in which case resources are + * distributed uniformly. + * + * @param totGuarant + * total guaranteed resource + * @param qAlloc + * List of child queues + * @param unassigned + * Unassigned resource per queue + * @param ignoreGuarantee + * ignore guarantee per queue. + */ + protected void computeFixpointAllocation(Resource totGuarant, + Collection qAlloc, Resource unassigned, + boolean ignoreGuarantee) { + // Prior to assigning the unused resources, process each queue as follows: + // If current > guaranteed, idealAssigned = guaranteed + untouchable extra + // Else idealAssigned = current; + // Subtract idealAssigned resources from unassigned. + // If the queue has all of its needs met (that is, if + // idealAssigned >= current + pending), remove it from consideration. + // Sort queues from most under-guaranteed to most over-guaranteed. + TQComparator tqComparator = new TQComparator(rc, totGuarant); + PriorityQueue orderedByNeed = new PriorityQueue<>(10, + tqComparator); + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueuePerPartition q = i.next(); + Resource used = q.getUsed(); + + if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) { + q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra); + } else { + q.idealAssigned = Resources.clone(used); + } + Resources.subtractFrom(unassigned, q.idealAssigned); + // If idealAssigned < (allocated + used + pending), q needs more + // resources, so + // add it to the list of underserved queues, ordered by need. + Resource curPlusPend = Resources.add(q.getUsed(), q.pending); + if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) { + orderedByNeed.add(q); + } + } + + // assign all cluster resources until no more demand, or no resources are + // left + while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant, + unassigned, Resources.none())) { + Resource wQassigned = Resource.newInstance(0, 0); + // we compute normalizedGuarantees capacity based on currently active + // queues + resetCapacity(unassigned, orderedByNeed, ignoreGuarantee); + + // For each underserved queue (or set of queues if multiple are equally + // underserved), offer its share of the unassigned resources based on its + // normalized guarantee. After the offer, if the queue is not satisfied, + // place it back in the ordered list of queues, recalculating its place + // in the order of most under-guaranteed to most over-guaranteed. In this + // way, the most underserved queue(s) are always given resources first. + Collection underserved = getMostUnderservedQueues( + orderedByNeed, tqComparator); + for (Iterator i = underserved.iterator(); i + .hasNext();) { + TempQueuePerPartition sub = i.next(); + Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned, + sub.normalizedGuarantee, Resource.newInstance(1, 1)); + Resource wQidle = sub.offer(wQavail, rc, totGuarant, + isReservedPreemptionCandidatesSelector); + Resource wQdone = Resources.subtract(wQavail, wQidle); + + if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) { + // The queue is still asking for more. Put it back in the priority + // queue, recalculating its order based on need. + orderedByNeed.add(sub); + } + Resources.addTo(wQassigned, wQdone); + } + Resources.subtractFrom(unassigned, wQassigned); + } + + // Sometimes its possible that, all queues are properly served. So intra + // queue preemption will not try for any preemption. How ever there are + // chances that within a queue, there are some imbalances. Hence make sure + // all queues are added to list. + while (!orderedByNeed.isEmpty()) { + TempQueuePerPartition q1 = orderedByNeed.remove(); + context.addPartitionToUnderServedQueues(q1.queueName, q1.partition); + } + } + + /** + * Computes a normalizedGuaranteed capacity based on active queues. + * + * @param clusterResource + * the total amount of resources in the cluster + * @param queues + * the list of queues to consider + * @param ignoreGuar + * ignore guarantee. + */ + private void resetCapacity(Resource clusterResource, + Collection queues, boolean ignoreGuar) { + Resource activeCap = Resource.newInstance(0, 0); + + if (ignoreGuar) { + for (TempQueuePerPartition q : queues) { + q.normalizedGuarantee = 1.0f / queues.size(); + } + } else { + for (TempQueuePerPartition q : queues) { + Resources.addTo(activeCap, q.getGuaranteed()); + } + for (TempQueuePerPartition q : queues) { + q.normalizedGuarantee = Resources.divide(rc, clusterResource, + q.getGuaranteed(), activeCap); + } + } + } + + // Take the most underserved TempQueue (the one on the head). Collect and + // return the list of all queues that have the same idealAssigned + // percentage of guaranteed. + private Collection getMostUnderservedQueues( + PriorityQueue orderedByNeed, + TQComparator tqComparator) { + ArrayList underserved = new ArrayList<>(); + while (!orderedByNeed.isEmpty()) { + TempQueuePerPartition q1 = orderedByNeed.remove(); + underserved.add(q1); + + // Add underserved queues in order for later uses + context.addPartitionToUnderServedQueues(q1.queueName, q1.partition); + TempQueuePerPartition q2 = orderedByNeed.peek(); + // q1's pct of guaranteed won't be larger than q2's. If it's less, then + // return what has already been collected. Otherwise, q1's pct of + // guaranteed == that of q2, so add q2 to underserved list during the + // next pass. + if (q2 == null || tqComparator.compare(q1, q2) < 0) { + if (null != q2) { + context.addPartitionToUnderServedQueues(q2.queueName, q2.partition); + } + return underserved; + } + } + return underserved; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java new file mode 100644 index 00000000000..dbd1f0a6da0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; + + +/** + * Abstract temporary data-structure for tracking resource availability,pending + * resource need, current utilization for app/queue. + */ +public class AbstractPreemptionEntity { + // Following fields are copied from scheduler + final String queueName; + + protected final Resource current; + protected final Resource amUsed; + protected final Resource reserved; + protected Resource pending; + + // Following fields are settled and used by candidate selection policies + Resource idealAssigned; + Resource toBePreempted; + Resource selected; + private Resource actuallyToBePreempted; + private Resource toBePreemptFromOther; + + AbstractPreemptionEntity(String queueName, Resource usedPerPartition, + Resource amUsedPerPartition, Resource reserved, + Resource pendingPerPartition) { + this.queueName = queueName; + this.current = usedPerPartition; + this.pending = pendingPerPartition; + this.reserved = reserved; + this.amUsed = amUsedPerPartition; + + this.idealAssigned = Resource.newInstance(0, 0); + this.actuallyToBePreempted = Resource.newInstance(0, 0); + this.toBePreempted = Resource.newInstance(0, 0); + this.toBePreemptFromOther = Resource.newInstance(0, 0); + this.selected = Resource.newInstance(0, 0); + } + + public Resource getUsed() { + return current; + } + + public Resource getUsedDeductAM() { + return Resources.subtract(current, amUsed); + } + + public Resource getAMUsed() { + return amUsed; + } + + public Resource getPending() { + return pending; + } + + public Resource getReserved() { + return reserved; + } + + public Resource getActuallyToBePreempted() { + return actuallyToBePreempted; + } + + public void setActuallyToBePreempted(Resource actuallyToBePreempted) { + this.actuallyToBePreempted = actuallyToBePreempted; + } + + public Resource getToBePreemptFromOther() { + return toBePreemptFromOther; + } + + public void setToBePreemptFromOther(Resource toBePreemptFromOther) { + this.toBePreemptFromOther = toBePreemptFromOther; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java index c52127d25ed..982b1f15e7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import java.util.Collection; +import java.util.LinkedHashSet; import java.util.Set; interface CapacitySchedulerPreemptionContext { @@ -49,4 +51,16 @@ interface CapacitySchedulerPreemptionContext { Set getLeafQueueNames(); Set getAllPartitions(); + + int getClusterMaxApplicationPriority(); + + Resource getPartitionResource(String partition); + + LinkedHashSet getUnderServedQueuesPerPartition(String partition); + + void addPartitionToUnderServedQueues(String queueName, String partition); + + float getMinimumThresholdForIntraQueuePreemption(); + + float getMaxAllowableLimitForIntraQueuePreemption(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java index 42d87301ff9..abad2a149d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java @@ -19,11 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -40,7 +43,8 @@ public class CapacitySchedulerPreemptionUtils { continue; } - // Only add resToObtainByPartition when actuallyToBePreempted resource >= 0 + // Only add resToObtainByPartition when actuallyToBePreempted resource >= + // 0 if (Resources.greaterThan(context.getResourceCalculator(), clusterResource, qT.getActuallyToBePreempted(), Resources.none())) { resToObtainByPartition.put(qT.partition, @@ -57,8 +61,8 @@ public class CapacitySchedulerPreemptionUtils { return false; } - Set containers = selectedCandidates.get( - container.getApplicationAttemptId()); + Set containers = selectedCandidates + .get(container.getApplicationAttemptId()); if (containers == null) { return false; } @@ -70,8 +74,8 @@ public class CapacitySchedulerPreemptionUtils { Map> selectedCandidates) { for (Set containers : selectedCandidates.values()) { for (RMContainer c : containers) { - SchedulerNode schedulerNode = context.getScheduler().getSchedulerNode( - c.getAllocatedNode()); + SchedulerNode schedulerNode = context.getScheduler() + .getSchedulerNode(c.getAllocatedNode()); if (null == schedulerNode) { continue; } @@ -89,8 +93,113 @@ public class CapacitySchedulerPreemptionUtils { if (null != res) { tq.deductActuallyToBePreempted(context.getResourceCalculator(), tq.totalPartitionResource, res); + Collection tas = tq.getApps(); + if (null == tas || tas.isEmpty()) { + continue; + } + + deductPreemptableResourcePerApp(context, tq.totalPartitionResource, + tas, res, partition); } } } } + + private static void deductPreemptableResourcePerApp( + CapacitySchedulerPreemptionContext context, + Resource totalPartitionResource, Collection tas, + Resource res, String partition) { + for (TempAppPerPartition ta : tas) { + ta.deductActuallyToBePreempted(context.getResourceCalculator(), + totalPartitionResource, res, partition); + } + } + + /** + * Invoke this method to preempt container based on resToObtain. + * + * @param rc + * resource calculator + * @param context + * preemption context + * @param resourceToObtainByPartitions + * map to hold resource to obtain per partition + * @param rmContainer + * container + * @param clusterResource + * total resource + * @param preemptMap + * map to hold preempted containers + * @param totalPreemptionAllowed + * total preemption allowed per round + * @return should we preempt rmContainer. If we should, deduct from + * resourceToObtainByPartition + */ + public static boolean tryPreemptContainerAndDeductResToObtain( + ResourceCalculator rc, CapacitySchedulerPreemptionContext context, + Map resourceToObtainByPartitions, + RMContainer rmContainer, Resource clusterResource, + Map> preemptMap, + Resource totalPreemptionAllowed) { + ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); + + // We will not account resource of a container twice or more + if (preemptMapContains(preemptMap, attemptId, rmContainer)) { + return false; + } + + String nodePartition = getPartitionByNodeId(context, + rmContainer.getAllocatedNode()); + Resource toObtainByPartition = resourceToObtainByPartitions + .get(nodePartition); + + if (null != toObtainByPartition + && Resources.greaterThan(rc, clusterResource, toObtainByPartition, + Resources.none()) + && Resources.fitsIn(rc, clusterResource, + rmContainer.getAllocatedResource(), totalPreemptionAllowed)) { + Resources.subtractFrom(toObtainByPartition, + rmContainer.getAllocatedResource()); + Resources.subtractFrom(totalPreemptionAllowed, + rmContainer.getAllocatedResource()); + + // When we have no more resource need to obtain, remove from map. + if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition, + Resources.none())) { + resourceToObtainByPartitions.remove(nodePartition); + } + + // Add to preemptMap + addToPreemptMap(preemptMap, attemptId, rmContainer); + return true; + } + + return false; + } + + private static String getPartitionByNodeId( + CapacitySchedulerPreemptionContext context, NodeId nodeId) { + return context.getScheduler().getSchedulerNode(nodeId).getPartition(); + } + + private static void addToPreemptMap( + Map> preemptMap, + ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { + Set set = preemptMap.get(appAttemptId); + if (null == set) { + set = new HashSet<>(); + preemptMap.put(appAttemptId, set); + } + set.add(containerToPreempt); + } + + private static boolean preemptMapContains( + Map> preemptMap, + ApplicationAttemptId attemptId, RMContainer rmContainer) { + Set rmContainers = preemptMap.get(attemptId); + if (null == rmContainers) { + return false; + } + return rmContainers.contains(rmContainer); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index a8c62fd0577..33e4afc373d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -18,11 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -34,9 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -112,9 +107,11 @@ public class FifoCandidatesSelector // Skip already selected containers continue; } - boolean preempted = tryPreemptContainerAndDeductResToObtain( - resToObtainByPartition, c, clusterResource, selectedCandidates, - totalPreemptionAllowed); + boolean preempted = CapacitySchedulerPreemptionUtils + .tryPreemptContainerAndDeductResToObtain(rc, + preemptionContext, resToObtainByPartition, c, + clusterResource, selectedCandidates, + totalPreemptionAllowed); if (!preempted) { continue; } @@ -185,9 +182,10 @@ public class FifoCandidatesSelector break; } - boolean preempted = - tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, - clusterResource, preemptMap, totalPreemptionAllowed); + boolean preempted = CapacitySchedulerPreemptionUtils + .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, + resToObtainByPartition, c, clusterResource, preemptMap, + totalPreemptionAllowed); if (preempted) { Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); } @@ -195,68 +193,6 @@ public class FifoCandidatesSelector skippedAMContainerlist.clear(); } - private boolean preemptMapContains( - Map> preemptMap, - ApplicationAttemptId attemptId, RMContainer rmContainer) { - Set rmContainers; - if (null == (rmContainers = preemptMap.get(attemptId))) { - return false; - } - return rmContainers.contains(rmContainer); - } - - /** - * Return should we preempt rmContainer. If we should, deduct from - * resourceToObtainByPartition - */ - private boolean tryPreemptContainerAndDeductResToObtain( - Map resourceToObtainByPartitions, - RMContainer rmContainer, Resource clusterResource, - Map> preemptMap, - Resource totalPreemptionAllowed) { - ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); - - // We will not account resource of a container twice or more - if (preemptMapContains(preemptMap, attemptId, rmContainer)) { - return false; - } - - String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode()); - Resource toObtainByPartition = - resourceToObtainByPartitions.get(nodePartition); - - if (null != toObtainByPartition && Resources.greaterThan(rc, - clusterResource, toObtainByPartition, Resources.none()) && Resources - .fitsIn(rc, clusterResource, rmContainer.getAllocatedResource(), - totalPreemptionAllowed)) { - Resources.subtractFrom(toObtainByPartition, - rmContainer.getAllocatedResource()); - Resources.subtractFrom(totalPreemptionAllowed, - rmContainer.getAllocatedResource()); - - // When we have no more resource need to obtain, remove from map. - if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition, - Resources.none())) { - resourceToObtainByPartitions.remove(nodePartition); - } - if (LOG.isDebugEnabled()) { - LOG.debug(this.getClass().getName() + " Marked container=" + rmContainer - .getContainerId() + " from partition=" + nodePartition + " queue=" - + rmContainer.getQueueName() + " to be preemption candidates"); - } - // Add to preemptMap - addToPreemptMap(preemptMap, attemptId, rmContainer); - return true; - } - - return false; - } - - private String getPartitionByNodeId(NodeId nodeId) { - return preemptionContext.getScheduler().getSchedulerNode(nodeId) - .getPartition(); - } - /** * Given a target preemption for a specific application, select containers * to preempt (after unreserving all reservation for that app). @@ -268,10 +204,6 @@ public class FifoCandidatesSelector Map> selectedContainers, Resource totalPreemptionAllowed) { ApplicationAttemptId appId = app.getApplicationAttemptId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Looking at application=" + app.getApplicationAttemptId() - + " resourceToObtain=" + resToObtainByPartition); - } // first drop reserved containers towards rsrcPreempt List reservedContainers = @@ -286,8 +218,9 @@ public class FifoCandidatesSelector } // Try to preempt this container - tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, - clusterResource, selectedContainers, totalPreemptionAllowed); + CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( + rc, preemptionContext, resToObtainByPartition, c, clusterResource, + selectedContainers, totalPreemptionAllowed); if (!preemptionContext.isObserveOnly()) { preemptionContext.getRMContext().getDispatcher().getEventHandler() @@ -328,41 +261,9 @@ public class FifoCandidatesSelector } // Try to preempt this container - tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, - clusterResource, selectedContainers, totalPreemptionAllowed); + CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( + rc, preemptionContext, resToObtainByPartition, c, clusterResource, + selectedContainers, totalPreemptionAllowed); } } - - /** - * Compare by reversed priority order first, and then reversed containerId - * order - * @param containers - */ - @VisibleForTesting - static void sortContainers(List containers){ - Collections.sort(containers, new Comparator() { - @Override - public int compare(RMContainer a, RMContainer b) { - Comparator c = new org.apache.hadoop.yarn.server - .resourcemanager.resource.Priority.Comparator(); - int priorityComp = c.compare(b.getContainer().getPriority(), - a.getContainer().getPriority()); - if (priorityComp != 0) { - return priorityComp; - } - return b.getContainerId().compareTo(a.getContainerId()); - } - }); - } - - private void addToPreemptMap( - Map> preemptMap, - ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { - Set set; - if (null == (set = preemptMap.get(appAttemptId))) { - set = new HashSet<>(); - preemptMap.put(appAttemptId, set); - } - set.add(containerToPreempt); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java new file mode 100644 index 00000000000..757f56722ca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * FifoIntraQueuePreemptionPlugin will handle intra-queue preemption for + * priority and user-limit. + */ +public class FifoIntraQueuePreemptionPlugin + implements + IntraQueuePreemptionComputePlugin { + + protected final CapacitySchedulerPreemptionContext context; + protected final ResourceCalculator rc; + + private static final Log LOG = + LogFactory.getLog(FifoIntraQueuePreemptionPlugin.class); + + public FifoIntraQueuePreemptionPlugin(ResourceCalculator rc, + CapacitySchedulerPreemptionContext preemptionContext) { + this.context = preemptionContext; + this.rc = rc; + } + + @Override + public Map getResourceDemandFromAppsPerQueue( + String queueName, String partition) { + + Map resToObtainByPartition = new HashMap<>(); + TempQueuePerPartition tq = context + .getQueueByPartition(queueName, partition); + + Collection appsOrderedByPriority = tq.getApps(); + Resource actualPreemptNeeded = resToObtainByPartition.get(partition); + + // Updating pending resource per-partition level. + if (actualPreemptNeeded == null) { + actualPreemptNeeded = Resources.createResource(0, 0); + resToObtainByPartition.put(partition, actualPreemptNeeded); + } + + for (TempAppPerPartition a1 : appsOrderedByPriority) { + Resources.addTo(actualPreemptNeeded, a1.getActuallyToBePreempted()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Selected to preempt " + actualPreemptNeeded + + " resource from partition:" + partition); + } + return resToObtainByPartition; + } + + @Override + public void computeAppsIdealAllocation(Resource clusterResource, + Resource partitionBasedResource, TempQueuePerPartition tq, + Map> selectedCandidates, + Resource totalPreemptedResourceAllowed, + Resource queueReassignableResource, float maxAllowablePreemptLimit) { + + // 1. AM used resource can be considered as a frozen resource for now. + // Hence such containers in a queue can be omitted from the preemption + // calculation. + Map perUserAMUsed = new HashMap(); + Resource amUsed = calculateUsedAMResourcesPerQueue(tq.partition, + tq.leafQueue, perUserAMUsed); + Resources.subtractFrom(queueReassignableResource, amUsed); + + // 2. tq.leafQueue will not be null as we validated it in caller side + Collection apps = tq.leafQueue.getAllApplications(); + + // We do not need preemption for a single app + if (apps.size() == 1) { + return; + } + + // 3. Create all tempApps for internal calculation and return a list from + // high priority to low priority order. + TAPriorityComparator taComparator = new TAPriorityComparator(); + PriorityQueue orderedByPriority = + createTempAppForResCalculation(tq.partition, apps, taComparator); + + // 4. Calculate idealAssigned per app by checking based on queue's + // unallocated resource.Also return apps arranged from lower priority to + // higher priority. + TreeSet orderedApps = + calculateIdealAssignedResourcePerApp(clusterResource, + partitionBasedResource, tq, selectedCandidates, + queueReassignableResource, orderedByPriority, perUserAMUsed); + + // 5. A configurable limit that could define an ideal allowable preemption + // limit. Based on current queue's capacity,defined how much % could become + // preemptable. + Resource maxIntraQueuePreemptable = Resources.multiply(tq.getGuaranteed(), + maxAllowablePreemptLimit); + if (Resources.greaterThan(rc, clusterResource, maxIntraQueuePreemptable, + tq.getActuallyToBePreempted())) { + Resources.subtractFrom(maxIntraQueuePreemptable, + tq.getActuallyToBePreempted()); + } else { + maxIntraQueuePreemptable = Resource.newInstance(0, 0); + } + + // 6. We have two configurations here, one is intra queue limit and second + // one is per-round limit for any time preemption. Take a minimum of these + Resource preemptionLimit = Resources.min(rc, clusterResource, + maxIntraQueuePreemptable, totalPreemptedResourceAllowed); + + // 7. From lowest priority app onwards, calculate toBePreempted resource + // based on demand. + calculateToBePreemptedResourcePerApp(clusterResource, orderedApps, + preemptionLimit); + + // Save all apps (low to high) to temp queue for further reference + tq.addAllApps(orderedApps); + + // 8. There are chances that we may preempt for the demand from same + // priority level, such cases are to be validated out. + validateOutSameAppPriorityFromDemand(clusterResource, + (TreeSet) tq.getApps()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition); + for (TempAppPerPartition tmpApp : tq.getApps()) { + LOG.debug(tmpApp); + } + } + } + + private void calculateToBePreemptedResourcePerApp(Resource clusterResource, + TreeSet orderedApps, Resource preemptionLimit) { + + for (TempAppPerPartition tmpApp : orderedApps) { + if (Resources.lessThanOrEqual(rc, clusterResource, preemptionLimit, + Resources.none()) + || Resources.lessThanOrEqual(rc, clusterResource, tmpApp.getUsed(), + Resources.none())) { + continue; + } + + Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(), + tmpApp.idealAssigned); + Resources.subtractFrom(preemtableFromApp, tmpApp.selected); + Resources.subtractFrom(preemtableFromApp, tmpApp.getAMUsed()); + + // Calculate toBePreempted from apps as follows: + // app.preemptable = min(max(app.used - app.selected - app.ideal, 0), + // intra_q_preemptable) + tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources + .max(rc, clusterResource, preemtableFromApp, Resources.none()), + preemptionLimit); + + preemptionLimit = Resources.subtract(preemptionLimit, + tmpApp.toBePreempted); + } + } + + /** + * Algorithm for calculating idealAssigned is as follows: + * For each partition: + * Q.reassignable = Q.used - Q.selected; + * + * # By default set ideal assigned 0 for app. + * app.idealAssigned as 0 + * # get user limit from scheduler. + * userLimitRes = Q.getUserLimit(userName) + * + * # initial all value to 0 + * Map userToAllocated + * + * # Loop from highest priority to lowest priority app to calculate ideal + * for app in sorted-by(priority) { + * if Q.reassignable < 0: + * break; + * + * if (user-to-allocated.get(app.user) < userLimitRes) { + * idealAssigned = min((userLimitRes - userToAllocated.get(app.user)), + * (app.used + app.pending - app.selected)) + * app.idealAssigned = min(Q.reassignable, idealAssigned) + * userToAllocated.get(app.user) += app.idealAssigned; + * } else { + * // skip this app because user-limit reached + * } + * Q.reassignable -= app.idealAssigned + * } + * + * @param clusterResource Cluster Resource + * @param partitionBasedResource resource per partition + * @param tq TempQueue + * @param selectedCandidates Already Selected preemption candidates + * @param queueReassignableResource Resource used in a queue + * @param orderedByPriority List of running apps + * @param perUserAMUsed AM used resource + * @return List of temp apps ordered from low to high priority + */ + private TreeSet calculateIdealAssignedResourcePerApp( + Resource clusterResource, Resource partitionBasedResource, + TempQueuePerPartition tq, + Map> selectedCandidates, + Resource queueReassignableResource, + PriorityQueue orderedByPriority, + Map perUserAMUsed) { + + Comparator reverseComp = Collections + .reverseOrder(new TAPriorityComparator()); + TreeSet orderedApps = new TreeSet<>(reverseComp); + + Map userIdealAssignedMapping = new HashMap<>(); + String partition = tq.partition; + + Map preCalculatedUserLimit = + new HashMap(); + + while (!orderedByPriority.isEmpty()) { + // Remove app from the next highest remaining priority and process it to + // calculate idealAssigned per app. + TempAppPerPartition tmpApp = orderedByPriority.remove(); + orderedApps.add(tmpApp); + + // Once unallocated resource is 0, we can stop assigning ideal per app. + if (Resources.lessThanOrEqual(rc, clusterResource, + queueReassignableResource, Resources.none())) { + continue; + } + + String userName = tmpApp.app.getUser(); + Resource userLimitResource = preCalculatedUserLimit.get(userName); + + // Verify whether we already calculated headroom for this user. + if (userLimitResource == null) { + userLimitResource = Resources.clone(tq.leafQueue + .getUserLimitPerUser(userName, partitionBasedResource, partition)); + + Resource amUsed = perUserAMUsed.get(userName); + if (null == amUsed) { + amUsed = Resources.createResource(0, 0); + } + + // Real AM used need not have to be considered for user-limit as well. + userLimitResource = Resources.subtract(userLimitResource, amUsed); + if (LOG.isDebugEnabled()) { + LOG.debug("Userlimit for user '" + userName + "' is :" + + userLimitResource + ", and amUsed is:" + amUsed); + } + + preCalculatedUserLimit.put(userName, userLimitResource); + } + + Resource idealAssignedForUser = userIdealAssignedMapping.get(userName); + + if (idealAssignedForUser == null) { + idealAssignedForUser = Resources.createResource(0, 0); + userIdealAssignedMapping.put(userName, idealAssignedForUser); + } + + // Calculate total selected container resources from current app. + getAlreadySelectedPreemptionCandidatesResource(selectedCandidates, + tmpApp, partition); + + // For any app, used+pending will give its idealAssigned. However it will + // be tightly linked to queue's unallocated quota. So lower priority apps + // idealAssigned may fall to 0 if higher priority apps demand is more. + Resource appIdealAssigned = Resources.add(tmpApp.getUsedDeductAM(), + tmpApp.getPending()); + Resources.subtractFrom(appIdealAssigned, tmpApp.selected); + + if (Resources.lessThan(rc, clusterResource, idealAssignedForUser, + userLimitResource)) { + appIdealAssigned = Resources.min(rc, clusterResource, appIdealAssigned, + Resources.subtract(userLimitResource, idealAssignedForUser)); + tmpApp.idealAssigned = Resources.clone(Resources.min(rc, + clusterResource, queueReassignableResource, appIdealAssigned)); + Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned); + } else { + continue; + } + + // Also set how much resource is needed by this app from others. + Resource appUsedExcludedSelected = Resources + .subtract(tmpApp.getUsedDeductAM(), tmpApp.selected); + if (Resources.greaterThan(rc, clusterResource, tmpApp.idealAssigned, + appUsedExcludedSelected)) { + tmpApp.setToBePreemptFromOther( + Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected)); + } + + Resources.subtractFrom(queueReassignableResource, tmpApp.idealAssigned); + } + + return orderedApps; + } + + /* + * Previous policies would have already selected few containers from an + * application. Calculate total resource from these selected containers. + */ + private void getAlreadySelectedPreemptionCandidatesResource( + Map> selectedCandidates, + TempAppPerPartition tmpApp, String partition) { + tmpApp.selected = Resources.createResource(0, 0); + Set containers = selectedCandidates + .get(tmpApp.app.getApplicationAttemptId()); + + if (containers == null) { + return; + } + + for (RMContainer cont : containers) { + if (partition.equals(cont.getNodeLabelExpression())) { + Resources.addTo(tmpApp.selected, cont.getAllocatedResource()); + } + } + } + + private PriorityQueue createTempAppForResCalculation( + String partition, Collection apps, + TAPriorityComparator taComparator) { + PriorityQueue orderedByPriority = new PriorityQueue<>( + 100, taComparator); + + // have an internal temp app structure to store intermediate data(priority) + for (FiCaSchedulerApp app : apps) { + + Resource used = app.getAppAttemptResourceUsage().getUsed(partition); + Resource amUsed = null; + if (!app.isWaitingForAMContainer()) { + amUsed = app.getAMResource(partition); + } + Resource pending = app.getTotalPendingRequestsPerPartition() + .get(partition); + Resource reserved = app.getAppAttemptResourceUsage() + .getReserved(partition); + + used = (used == null) ? Resources.createResource(0, 0) : used; + amUsed = (amUsed == null) ? Resources.createResource(0, 0) : amUsed; + pending = (pending == null) ? Resources.createResource(0, 0) : pending; + reserved = (reserved == null) ? Resources.createResource(0, 0) : reserved; + + HashSet partitions = new HashSet( + app.getAppAttemptResourceUsage().getNodePartitionsSet()); + partitions.addAll(app.getTotalPendingRequestsPerPartition().keySet()); + + // Create TempAppPerQueue for further calculation. + TempAppPerPartition tmpApp = new TempAppPerPartition(app, + Resources.clone(used), Resources.clone(amUsed), + Resources.clone(reserved), Resources.clone(pending)); + + // Set ideal allocation of app as 0. + tmpApp.idealAssigned = Resources.createResource(0, 0); + + orderedByPriority.add(tmpApp); + } + return orderedByPriority; + } + + /* + * Fifo+Priority based preemption policy need not have to preempt resources at + * same priority level. Such cases will be validated out. + */ + public void validateOutSameAppPriorityFromDemand(Resource cluster, + TreeSet appsOrderedfromLowerPriority) { + + TempAppPerPartition[] apps = appsOrderedfromLowerPriority + .toArray(new TempAppPerPartition[appsOrderedfromLowerPriority.size()]); + if (apps.length <= 0) { + return; + } + + int lPriority = 0; + int hPriority = apps.length - 1; + + while (lPriority < hPriority + && !apps[lPriority].equals(apps[hPriority]) + && apps[lPriority].getPriority() < apps[hPriority].getPriority()) { + Resource toPreemptFromOther = apps[hPriority] + .getToBePreemptFromOther(); + Resource actuallyToPreempt = apps[lPriority].getActuallyToBePreempted(); + Resource delta = Resources.subtract(apps[lPriority].toBePreempted, + actuallyToPreempt); + + if (Resources.greaterThan(rc, cluster, delta, Resources.none())) { + Resource toPreempt = Resources.min(rc, cluster, + toPreemptFromOther, delta); + + apps[hPriority].setToBePreemptFromOther( + Resources.subtract(toPreemptFromOther, toPreempt)); + apps[lPriority].setActuallyToBePreempted( + Resources.add(actuallyToPreempt, toPreempt)); + } + + if (Resources.lessThanOrEqual(rc, cluster, + apps[lPriority].toBePreempted, + apps[lPriority].getActuallyToBePreempted())) { + lPriority++; + continue; + } + + if (Resources.equals(apps[hPriority].getToBePreemptFromOther(), + Resources.none())) { + hPriority--; + continue; + } + } + } + + private Resource calculateUsedAMResourcesPerQueue(String partition, + LeafQueue leafQueue, Map perUserAMUsed) { + Collection runningApps = leafQueue.getApplications(); + Resource amUsed = Resources.createResource(0, 0); + + for (FiCaSchedulerApp app : runningApps) { + Resource userAMResource = perUserAMUsed.get(app.getUser()); + if (null == userAMResource) { + userAMResource = Resources.createResource(0, 0); + perUserAMUsed.put(app.getUser(), userAMResource); + } + + Resources.addTo(userAMResource, app.getAMResource(partition)); + Resources.addTo(amUsed, app.getAMResource(partition)); + } + return amUsed; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java new file mode 100644 index 00000000000..039b53e667e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Identifies over utilized resources within a queue and tries to normalize + * them to resolve resource allocation anomalies w.r.t priority and user-limit. + */ +public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { + + @SuppressWarnings("serial") + static class TAPriorityComparator + implements + Serializable, + Comparator { + + @Override + public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) { + Priority p1 = Priority.newInstance(tq1.getPriority()); + Priority p2 = Priority.newInstance(tq2.getPriority()); + + if (!p1.equals(p2)) { + return p1.compareTo(p2); + } + return tq1.getApplicationId().compareTo(tq2.getApplicationId()); + } + } + + IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null; + final CapacitySchedulerPreemptionContext context; + + private static final Log LOG = + LogFactory.getLog(IntraQueueCandidatesSelector.class); + + IntraQueueCandidatesSelector( + CapacitySchedulerPreemptionContext preemptionContext) { + super(preemptionContext); + fifoPreemptionComputePlugin = new FifoIntraQueuePreemptionPlugin(rc, + preemptionContext); + context = preemptionContext; + } + + @Override + public Map> selectCandidates( + Map> selectedCandidates, + Resource clusterResource, Resource totalPreemptedResourceAllowed) { + + // 1. Calculate the abnormality within each queue one by one. + computeIntraQueuePreemptionDemand( + clusterResource, totalPreemptedResourceAllowed, selectedCandidates); + + // 2. Previous selectors (with higher priority) could have already + // selected containers. We need to deduct pre-emptable resources + // based on already selected candidates. + CapacitySchedulerPreemptionUtils + .deductPreemptableResourcesBasedSelectedCandidates(preemptionContext, + selectedCandidates); + + // 3. Loop through all partitions to select containers for preemption. + for (String partition : preemptionContext.getAllPartitions()) { + LinkedHashSet queueNames = preemptionContext + .getUnderServedQueuesPerPartition(partition); + + // Error check to handle non-mapped labels to queue. + if (null == queueNames) { + continue; + } + + // 4. Iterate from most under-served queue in order. + for (String queueName : queueNames) { + LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName, + RMNodeLabelsManager.NO_LABEL).leafQueue; + + // skip if not a leafqueue + if (null == leafQueue) { + continue; + } + + // 5. Calculate the resource to obtain per partition + Map resToObtainByPartition = fifoPreemptionComputePlugin + .getResourceDemandFromAppsPerQueue(queueName, partition); + + // 6. Based on the selected resource demand per partition, select + // containers with known policy from inter-queue preemption. + synchronized (leafQueue) { + Iterator desc = leafQueue.getOrderingPolicy() + .getPreemptionIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp app = desc.next(); + preemptFromLeastStarvedApp(selectedCandidates, clusterResource, + totalPreemptedResourceAllowed, resToObtainByPartition, + leafQueue, app); + } + } + } + } + + return selectedCandidates; + } + + private void preemptFromLeastStarvedApp( + Map> selectedCandidates, + Resource clusterResource, Resource totalPreemptedResourceAllowed, + Map resToObtainByPartition, LeafQueue leafQueue, + FiCaSchedulerApp app) { + + // ToDo: Reuse reservation selector here. + + List liveContainers = new ArrayList<>( + app.getLiveContainers()); + sortContainers(liveContainers); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "totalPreemptedResourceAllowed for preemption at this round is :" + + totalPreemptedResourceAllowed); + } + + for (RMContainer c : liveContainers) { + + // if there are no demand, return. + if (resToObtainByPartition.isEmpty()) { + return; + } + + // skip preselected containers. + if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c, + selectedCandidates)) { + continue; + } + + // Skip already marked to killable containers + if (null != preemptionContext.getKillableContainers() && preemptionContext + .getKillableContainers().contains(c.getContainerId())) { + continue; + } + + // Skip AM Container from preemption for now. + if (c.isAMContainer()) { + continue; + } + + // Try to preempt this container + CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( + rc, preemptionContext, resToObtainByPartition, c, clusterResource, + selectedCandidates, totalPreemptedResourceAllowed); + } + + } + + private void computeIntraQueuePreemptionDemand(Resource clusterResource, + Resource totalPreemptedResourceAllowed, + Map> selectedCandidates) { + + // 1. Iterate through all partition to calculate demand within a partition. + for (String partition : context.getAllPartitions()) { + LinkedHashSet queueNames = context + .getUnderServedQueuesPerPartition(partition); + + if (null == queueNames) { + continue; + } + + // 2. Its better to get partition based resource limit earlier before + // starting calculation + Resource partitionBasedResource = + context.getPartitionResource(partition); + + // 3. loop through all queues corresponding to a partition. + for (String queueName : queueNames) { + TempQueuePerPartition tq = context.getQueueByPartition(queueName, + partition); + LeafQueue leafQueue = tq.leafQueue; + + // skip if its parent queue + if (null == leafQueue) { + continue; + } + + // 4. Consider reassignableResource as (used - actuallyToBePreempted). + // This provides as upper limit to split apps quota in a queue. + Resource queueReassignableResource = Resources.subtract(tq.getUsed(), + tq.getActuallyToBePreempted()); + + // 5. Check queue's used capacity. Make sure that the used capacity is + // above certain limit to consider for intra queue preemption. + if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context + .getMinimumThresholdForIntraQueuePreemption()) { + continue; + } + + // 6. compute the allocation of all apps based on queue's unallocated + // capacity + fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource, + partitionBasedResource, tq, selectedCandidates, + totalPreemptedResourceAllowed, + queueReassignableResource, + context.getMaxAllowableLimitForIntraQueuePreemption()); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java new file mode 100644 index 00000000000..93ebe65049d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + + +interface IntraQueuePreemptionComputePlugin { + + Map getResourceDemandFromAppsPerQueue(String queueName, + String partition); + + void computeAppsIdealAllocation(Resource clusterResource, + Resource partitionBasedResource, TempQueuePerPartition tq, + Map> selectedCandidates, + Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned, + float maxAllowablePreemptLimit); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java index 103b4193f0d..3017e8fa6f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java @@ -27,61 +27,22 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.PriorityQueue; import java.util.Set; /** * Calculate how much resources need to be preempted for each queue, * will be used by {@link PreemptionCandidatesSelector} */ -public class PreemptableResourceCalculator { +public class PreemptableResourceCalculator + extends + AbstractPreemptableResourceCalculator { private static final Log LOG = LogFactory.getLog(PreemptableResourceCalculator.class); - private final CapacitySchedulerPreemptionContext context; - private final ResourceCalculator rc; private boolean isReservedPreemptionCandidatesSelector; - static class TQComparator implements Comparator { - private ResourceCalculator rc; - private Resource clusterRes; - - TQComparator(ResourceCalculator rc, Resource clusterRes) { - this.rc = rc; - this.clusterRes = clusterRes; - } - - @Override - public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { - if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { - return -1; - } - if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) { - return 1; - } - return 0; - } - - // Calculates idealAssigned / guaranteed - // TempQueues with 0 guarantees are always considered the most over - // capacity and therefore considered last for resources. - private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { - double pctOver = Integer.MAX_VALUE; - if (q != null && Resources.greaterThan(rc, clusterRes, - q.getGuaranteed(), - Resources.none())) { - pctOver = Resources.divide(rc, clusterRes, q.idealAssigned, - q.getGuaranteed()); - } - return (pctOver); - } - } - /** * PreemptableResourceCalculator constructor * @@ -93,136 +54,7 @@ public class PreemptableResourceCalculator { public PreemptableResourceCalculator( CapacitySchedulerPreemptionContext preemptionContext, boolean isReservedPreemptionCandidatesSelector) { - context = preemptionContext; - rc = preemptionContext.getResourceCalculator(); - this.isReservedPreemptionCandidatesSelector = - isReservedPreemptionCandidatesSelector; - } - - /** - * Computes a normalizedGuaranteed capacity based on active queues - * @param rc resource calculator - * @param clusterResource the total amount of resources in the cluster - * @param queues the list of queues to consider - */ - private void resetCapacity(ResourceCalculator rc, Resource clusterResource, - Collection queues, boolean ignoreGuar) { - Resource activeCap = Resource.newInstance(0, 0); - - if (ignoreGuar) { - for (TempQueuePerPartition q : queues) { - q.normalizedGuarantee = 1.0f / queues.size(); - } - } else { - for (TempQueuePerPartition q : queues) { - Resources.addTo(activeCap, q.getGuaranteed()); - } - for (TempQueuePerPartition q : queues) { - q.normalizedGuarantee = Resources.divide(rc, clusterResource, - q.getGuaranteed(), activeCap); - } - } - } - - // Take the most underserved TempQueue (the one on the head). Collect and - // return the list of all queues that have the same idealAssigned - // percentage of guaranteed. - protected Collection getMostUnderservedQueues( - PriorityQueue orderedByNeed, - TQComparator tqComparator) { - ArrayList underserved = new ArrayList<>(); - while (!orderedByNeed.isEmpty()) { - TempQueuePerPartition q1 = orderedByNeed.remove(); - underserved.add(q1); - TempQueuePerPartition q2 = orderedByNeed.peek(); - // q1's pct of guaranteed won't be larger than q2's. If it's less, then - // return what has already been collected. Otherwise, q1's pct of - // guaranteed == that of q2, so add q2 to underserved list during the - // next pass. - if (q2 == null || tqComparator.compare(q1,q2) < 0) { - return underserved; - } - } - return underserved; - } - - - /** - * Given a set of queues compute the fix-point distribution of unassigned - * resources among them. As pending request of a queue are exhausted, the - * queue is removed from the set and remaining capacity redistributed among - * remaining queues. The distribution is weighted based on guaranteed - * capacity, unless asked to ignoreGuarantee, in which case resources are - * distributed uniformly. - */ - private void computeFixpointAllocation(ResourceCalculator rc, - Resource tot_guarant, Collection qAlloc, - Resource unassigned, boolean ignoreGuarantee) { - // Prior to assigning the unused resources, process each queue as follows: - // If current > guaranteed, idealAssigned = guaranteed + untouchable extra - // Else idealAssigned = current; - // Subtract idealAssigned resources from unassigned. - // If the queue has all of its needs met (that is, if - // idealAssigned >= current + pending), remove it from consideration. - // Sort queues from most under-guaranteed to most over-guaranteed. - TQComparator tqComparator = new TQComparator(rc, tot_guarant); - PriorityQueue orderedByNeed = new PriorityQueue<>(10, - tqComparator); - for (Iterator i = qAlloc.iterator(); i.hasNext();) { - TempQueuePerPartition q = i.next(); - Resource used = q.getUsed(); - - if (Resources.greaterThan(rc, tot_guarant, used, - q.getGuaranteed())) { - q.idealAssigned = Resources.add( - q.getGuaranteed(), q.untouchableExtra); - } else { - q.idealAssigned = Resources.clone(used); - } - Resources.subtractFrom(unassigned, q.idealAssigned); - // If idealAssigned < (allocated + used + pending), q needs more resources, so - // add it to the list of underserved queues, ordered by need. - Resource curPlusPend = Resources.add(q.getUsed(), q.pending); - if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) { - orderedByNeed.add(q); - } - } - - //assign all cluster resources until no more demand, or no resources are left - while (!orderedByNeed.isEmpty() - && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) { - Resource wQassigned = Resource.newInstance(0, 0); - // we compute normalizedGuarantees capacity based on currently active - // queues - resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee); - - // For each underserved queue (or set of queues if multiple are equally - // underserved), offer its share of the unassigned resources based on its - // normalized guarantee. After the offer, if the queue is not satisfied, - // place it back in the ordered list of queues, recalculating its place - // in the order of most under-guaranteed to most over-guaranteed. In this - // way, the most underserved queue(s) are always given resources first. - Collection underserved = - getMostUnderservedQueues(orderedByNeed, tqComparator); - for (Iterator i = underserved.iterator(); i - .hasNext();) { - TempQueuePerPartition sub = i.next(); - Resource wQavail = Resources.multiplyAndNormalizeUp(rc, - unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1)); - Resource wQidle = sub.offer(wQavail, rc, tot_guarant, - isReservedPreemptionCandidatesSelector); - Resource wQdone = Resources.subtract(wQavail, wQidle); - - if (Resources.greaterThan(rc, tot_guarant, - wQdone, Resources.none())) { - // The queue is still asking for more. Put it back in the priority - // queue, recalculating its order based on need. - orderedByNeed.add(sub); - } - Resources.addTo(wQassigned, wQdone); - } - Resources.subtractFrom(unassigned, wQassigned); - } + super(preemptionContext, isReservedPreemptionCandidatesSelector); } /** @@ -263,14 +95,14 @@ public class PreemptableResourceCalculator { } // first compute the allocation as a fixpoint based on guaranteed capacity - computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned, + computeFixpointAllocation(tot_guarant, nonZeroGuarQueues, unassigned, false); // if any capacity is left unassigned, distributed among zero-guarantee // queues uniformly (i.e., not based on guaranteed capacity, as this is zero) if (!zeroGuarQueues.isEmpty() && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) { - computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned, + computeFixpointAllocation(tot_guarant, zeroGuarQueues, unassigned, true); } @@ -321,13 +153,12 @@ public class PreemptableResourceCalculator { computeIdealResourceDistribution(rc, root.getChildren(), totalPreemptionAllowed, root.idealAssigned); // compute recursively for lower levels and build list of leafs - for(TempQueuePerPartition t : root.getChildren()) { + for (TempQueuePerPartition t : root.getChildren()) { recursivelyComputeIdealAssignment(t, totalPreemptionAllowed); } } } - private void calculateResToObtainByPartitionForLeafQueues( Set leafQueueNames, Resource clusterResource) { // Loop all leaf queues diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java index dd33d8f2f13..c74e34e20dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java @@ -19,10 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import com.google.common.annotations.VisibleForTesting; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Set; @@ -41,7 +47,7 @@ public abstract class PreemptionCandidatesSelector { * selected candidates. * * @param selectedCandidates already selected candidates from previous policies - * @param clusterResource + * @param clusterResource total resource * @param totalPreemptedResourceAllowed how many resources allowed to be * preempted in this round * @return merged selected candidates. @@ -49,4 +55,28 @@ public abstract class PreemptionCandidatesSelector { public abstract Map> selectCandidates( Map> selectedCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed); + + /** + * Compare by reversed priority order first, and then reversed containerId + * order. + * + * @param containers list of containers to sort for. + */ + @VisibleForTesting + static void sortContainers(List containers) { + Collections.sort(containers, new Comparator() { + @Override + public int compare(RMContainer a, RMContainer b) { + Comparator c = new org.apache.hadoop.yarn.server + .resourcemanager.resource.Priority.Comparator(); + int priorityComp = c.compare(b.getContainer().getPriority(), + a.getContainer().getPriority()); + if (priorityComp != 0) { + return priorityComp; + } + return b.getContainerId().compareTo(a.getContainerId()); + } + }); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index bf19f5a62fc..4ff0bf3491d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -52,6 +52,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -91,6 +92,9 @@ public class ProportionalCapacityPreemptionPolicy private boolean observeOnly; private boolean lazyPreempionEnabled; + private float maxAllowableLimitForIntraQueuePreemption; + private float minimumThresholdForIntraQueuePreemption; + // Pointer to other RM components private RMContext rmContext; private ResourceCalculator rc; @@ -102,6 +106,8 @@ public class ProportionalCapacityPreemptionPolicy new HashMap<>(); private Map> queueToPartitions = new HashMap<>(); + private Map> partitionToUnderServedQueues = + new HashMap>(); private List candidatesSelectionPolicies = new ArrayList<>(); private Set allPartitions; @@ -171,23 +177,44 @@ public class ProportionalCapacityPreemptionPolicy CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED); + maxAllowableLimitForIntraQueuePreemption = csConfig.getFloat( + CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + CapacitySchedulerConfiguration. + DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT); + + minimumThresholdForIntraQueuePreemption = csConfig.getFloat( + CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD, + CapacitySchedulerConfiguration. + DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD); + rc = scheduler.getResourceCalculator(); nlm = scheduler.getRMContext().getNodeLabelManager(); // Do we need to specially consider reserved containers? boolean selectCandidatesForResevedContainers = csConfig.getBoolean( - CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, - CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS); + CapacitySchedulerConfiguration. + PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, + CapacitySchedulerConfiguration. + DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS); if (selectCandidatesForResevedContainers) { - candidatesSelectionPolicies.add( - new ReservedContainerCandidatesSelector(this)); + candidatesSelectionPolicies + .add(new ReservedContainerCandidatesSelector(this)); } // initialize candidates preemption selection policies - candidatesSelectionPolicies.add( - new FifoCandidatesSelector(this)); + candidatesSelectionPolicies.add(new FifoCandidatesSelector(this)); + + // Do we need to specially consider intra queue + boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean( + CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, + CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED); + if (isIntraQueuePreemptionEnabled) { + candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this)); + } } - + @Override public ResourceCalculator getResourceCalculator() { return rc; @@ -209,6 +236,12 @@ public class ProportionalCapacityPreemptionPolicy @SuppressWarnings("unchecked") private void preemptOrkillSelectedContainerAfterWait( Map> selectedCandidates) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Starting to preempt containers for selectedCandidates and size:" + + selectedCandidates.size()); + } + // preempt (or kill) the selected containers for (Map.Entry> e : selectedCandidates .entrySet()) { @@ -233,6 +266,7 @@ public class ProportionalCapacityPreemptionPolicy // not have to raise another event. continue; } + //otherwise just send preemption events rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent(appAttemptId, container, @@ -291,7 +325,6 @@ public class ProportionalCapacityPreemptionPolicy * @param root the root of the CapacityScheduler queue hierarchy * @param clusterResources the total amount of resources in the cluster */ - @SuppressWarnings("unchecked") private void containerBasedPreemptOrKill(CSQueue root, Resource clusterResources) { // Sync killable containers from scheduler when lazy preemption enabled @@ -537,4 +570,41 @@ public class ProportionalCapacityPreemptionPolicy Map> getQueuePartitions() { return queueToPartitions; } + + @Override + public int getClusterMaxApplicationPriority() { + return scheduler.getMaxClusterLevelAppPriority().getPriority(); + } + + @Override + public float getMaxAllowableLimitForIntraQueuePreemption() { + return maxAllowableLimitForIntraQueuePreemption; + } + + @Override + public float getMinimumThresholdForIntraQueuePreemption() { + return minimumThresholdForIntraQueuePreemption; + } + + @Override + public Resource getPartitionResource(String partition) { + return Resources.clone(nlm.getResourceByLabel(partition, + Resources.clone(scheduler.getClusterResource()))); + } + + public LinkedHashSet getUnderServedQueuesPerPartition( + String partition) { + return partitionToUnderServedQueues.get(partition); + } + + public void addPartitionToUnderServedQueues(String queueName, + String partition) { + LinkedHashSet underServedQueues = partitionToUnderServedQueues + .get(partition); + if (null == underServedQueues) { + underServedQueues = new LinkedHashSet(); + partitionToUnderServedQueues.put(partition, underServedQueues); + } + underServedQueues.add(queueName); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java new file mode 100644 index 00000000000..fccd2a74732 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + + +/** + * Temporary data-structure tracking resource availability, pending resource + * need, current utilization for an application. + */ +public class TempAppPerPartition extends AbstractPreemptionEntity { + + // Following fields are settled and used by candidate selection policies + private final int priority; + private final ApplicationId applicationId; + + FiCaSchedulerApp app; + + TempAppPerPartition(FiCaSchedulerApp app, Resource usedPerPartition, + Resource amUsedPerPartition, Resource reserved, + Resource pendingPerPartition) { + super(app.getQueueName(), usedPerPartition, amUsedPerPartition, reserved, + pendingPerPartition); + + this.priority = app.getPriority().getPriority(); + this.applicationId = app.getApplicationId(); + this.app = app; + } + + public FiCaSchedulerApp getFiCaSchedulerApp() { + return app; + } + + public void assignPreemption(Resource killable) { + Resources.addTo(toBePreempted, killable); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(" NAME: " + getApplicationId()).append(" PRIO: ").append(priority) + .append(" CUR: ").append(getUsed()).append(" PEN: ").append(pending) + .append(" RESERVED: ").append(reserved).append(" IDEAL_ASSIGNED: ") + .append(idealAssigned).append(" PREEMPT_OTHER: ") + .append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ") + .append(toBePreempted).append(" ACTUAL_PREEMPT: ") + .append(getActuallyToBePreempted()).append("\n"); + + return sb.toString(); + } + + void appendLogString(StringBuilder sb) { + sb.append(queueName).append(", ").append(getUsed().getMemorySize()) + .append(", ").append(getUsed().getVirtualCores()).append(", ") + .append(pending.getMemorySize()).append(", ") + .append(pending.getVirtualCores()).append(", ") + .append(idealAssigned.getMemorySize()).append(", ") + .append(idealAssigned.getVirtualCores()).append(", ") + .append(toBePreempted.getMemorySize()).append(", ") + .append(toBePreempted.getVirtualCores()).append(", ") + .append(getActuallyToBePreempted().getMemorySize()).append(", ") + .append(getActuallyToBePreempted().getVirtualCores()); + } + + public int getPriority() { + return priority; + } + + public ApplicationId getApplicationId() { + return applicationId; + } + + public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator, + Resource cluster, Resource toBeDeduct, String partition) { + if (Resources.greaterThan(resourceCalculator, cluster, + getActuallyToBePreempted(), toBeDeduct)) { + Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 04ed1358806..28099c449f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -25,34 +25,29 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; +import java.util.Collection; /** * Temporary data-structure tracking resource availability, pending resource * need, current utilization. This is per-queue-per-partition data structure */ -public class TempQueuePerPartition { +public class TempQueuePerPartition extends AbstractPreemptionEntity { // Following fields are copied from scheduler - final String queueName; final String partition; - final Resource pending; - private final Resource current; private final Resource killable; - private final Resource reserved; private final float absCapacity; private final float absMaxCapacity; final Resource totalPartitionResource; - // Following fields are setted and used by candidate selection policies - Resource idealAssigned; - Resource toBePreempted; + // Following fields are settled and used by candidate selection policies Resource untouchableExtra; Resource preemptableExtra; - private Resource actuallyToBePreempted; double normalizedGuarantee; final ArrayList children; + private Collection apps; LeafQueue leafQueue; boolean preemptionDisabled; @@ -60,8 +55,8 @@ public class TempQueuePerPartition { boolean preemptionDisabled, String partition, Resource killable, float absCapacity, float absMaxCapacity, Resource totalPartitionResource, Resource reserved, CSQueue queue) { - this.queueName = queueName; - this.current = current; + super(queueName, current, Resource.newInstance(0, 0), reserved, + Resource.newInstance(0, 0)); if (queue instanceof LeafQueue) { LeafQueue l = (LeafQueue) queue; @@ -72,11 +67,9 @@ public class TempQueuePerPartition { pending = Resources.createResource(0); } - this.idealAssigned = Resource.newInstance(0, 0); - this.actuallyToBePreempted = Resource.newInstance(0, 0); - this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; this.children = new ArrayList<>(); + this.apps = new ArrayList<>(); this.untouchableExtra = Resource.newInstance(0, 0); this.preemptableExtra = Resource.newInstance(0, 0); this.preemptionDisabled = preemptionDisabled; @@ -85,7 +78,6 @@ public class TempQueuePerPartition { this.absCapacity = absCapacity; this.absMaxCapacity = absMaxCapacity; this.totalPartitionResource = totalPartitionResource; - this.reserved = reserved; } public void setLeafQueue(LeafQueue l) { @@ -95,7 +87,9 @@ public class TempQueuePerPartition { /** * When adding a child we also aggregate its pending resource needs. - * @param q the child queue to add to this queue + * + * @param q + * the child queue to add to this queue */ public void addChild(TempQueuePerPartition q) { assert leafQueue == null; @@ -103,14 +97,10 @@ public class TempQueuePerPartition { Resources.addTo(pending, q.pending); } - public ArrayList getChildren(){ + public ArrayList getChildren() { return children; } - public Resource getUsed() { - return current; - } - public Resource getUsedDeductReservd() { return Resources.subtract(current, reserved); } @@ -122,28 +112,30 @@ public class TempQueuePerPartition { Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( Resources.subtract(getMax(), idealAssigned), Resource.newInstance(0, 0)); - // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) + // remain = avail - min(avail, (max - assigned), (current + pending - + // assigned)) Resource accepted = Resources.min(rc, clusterResource, - absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail, - Resources - /* - * When we're using FifoPreemptionSelector - * (considerReservedResource = false). - * - * We should deduct reserved resource to avoid excessive preemption: - * - * For example, if an under-utilized queue has used = reserved = 20. - * Preemption policy will try to preempt 20 containers - * (which is not satisfied) from different hosts. - * - * In FifoPreemptionSelector, there's no guarantee that preempted - * resource can be used by pending request, so policy will preempt - * resources repeatly. - */ - .subtract(Resources.add( - (considersReservedResource ? getUsed() : - getUsedDeductReservd()), - pending), idealAssigned))); + absMaxCapIdealAssignedDelta, + Resources.min(rc, clusterResource, avail, Resources + /* + * When we're using FifoPreemptionSelector (considerReservedResource + * = false). + * + * We should deduct reserved resource to avoid excessive preemption: + * + * For example, if an under-utilized queue has used = reserved = 20. + * Preemption policy will try to preempt 20 containers (which is not + * satisfied) from different hosts. + * + * In FifoPreemptionSelector, there's no guarantee that preempted + * resource can be used by pending request, so policy will preempt + * resources repeatly. + */ + .subtract( + Resources.add((considersReservedResource + ? getUsed() + : getUsedDeductReservd()), pending), + idealAssigned))); Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain; @@ -162,8 +154,7 @@ public class TempQueuePerPartition { untouchableExtra = Resources.none(); preemptableExtra = Resources.none(); - Resource extra = Resources.subtract(getUsed(), - getGuaranteed()); + Resource extra = Resources.subtract(getUsed(), getGuaranteed()); if (Resources.lessThan(rc, totalPartitionResource, extra, Resources.none())) { extra = Resources.none(); @@ -197,26 +188,21 @@ public class TempQueuePerPartition { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append(" NAME: " + queueName) - .append(" CUR: ").append(current) - .append(" PEN: ").append(pending) - .append(" RESERVED: ").append(reserved) - .append(" GAR: ").append(getGuaranteed()) - .append(" NORM: ").append(normalizedGuarantee) - .append(" IDEAL_ASSIGNED: ").append(idealAssigned) - .append(" IDEAL_PREEMPT: ").append(toBePreempted) - .append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted) + sb.append(" NAME: " + queueName).append(" CUR: ").append(current) + .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved) + .append(" GAR: ").append(getGuaranteed()).append(" NORM: ") + .append(normalizedGuarantee).append(" IDEAL_ASSIGNED: ") + .append(idealAssigned).append(" IDEAL_PREEMPT: ").append(toBePreempted) + .append(" ACTUAL_PREEMPT: ").append(getActuallyToBePreempted()) .append(" UNTOUCHABLE: ").append(untouchableExtra) - .append(" PREEMPTABLE: ").append(preemptableExtra) - .append("\n"); + .append(" PREEMPTABLE: ").append(preemptableExtra).append("\n"); return sb.toString(); } public void assignPreemption(float scalingFactor, ResourceCalculator rc, Resource clusterResource) { - Resource usedDeductKillable = Resources.subtract( - getUsed(), killable); + Resource usedDeductKillable = Resources.subtract(getUsed(), killable); Resource totalResource = Resources.add(getUsed(), pending); // The minimum resource that we need to keep for a queue is: @@ -224,7 +210,8 @@ public class TempQueuePerPartition { // // Doing this because when we calculate ideal allocation doesn't consider // reserved resource, ideal-allocation calculated could be less than - // guaranteed and total. We should avoid preempt from a queue if it is already + // guaranteed and total. We should avoid preempt from a queue if it is + // already // <= its guaranteed resource. Resource minimumQueueResource = Resources.max(rc, clusterResource, Resources.min(rc, clusterResource, totalResource, getGuaranteed()), @@ -233,33 +220,26 @@ public class TempQueuePerPartition { if (Resources.greaterThan(rc, clusterResource, usedDeductKillable, minimumQueueResource)) { toBePreempted = Resources.multiply( - Resources.subtract(usedDeductKillable, minimumQueueResource), scalingFactor); + Resources.subtract(usedDeductKillable, minimumQueueResource), + scalingFactor); } else { toBePreempted = Resources.none(); } } - public Resource getActuallyToBePreempted() { - return actuallyToBePreempted; - } - - public void setActuallyToBePreempted(Resource res) { - this.actuallyToBePreempted = res; - } - public void deductActuallyToBePreempted(ResourceCalculator rc, Resource cluster, Resource toBeDeduct) { - if (Resources.greaterThan(rc, cluster, actuallyToBePreempted, toBeDeduct)) { - Resources.subtractFrom(actuallyToBePreempted, toBeDeduct); + if (Resources.greaterThan(rc, cluster, getActuallyToBePreempted(), + toBeDeduct)) { + Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); } - actuallyToBePreempted = Resources.max(rc, cluster, actuallyToBePreempted, - Resources.none()); + setActuallyToBePreempted(Resources.max(rc, cluster, + getActuallyToBePreempted(), Resources.none())); } void appendLogString(StringBuilder sb) { - sb.append(queueName).append(", ") - .append(current.getMemorySize()).append(", ") - .append(current.getVirtualCores()).append(", ") + sb.append(queueName).append(", ").append(current.getMemorySize()) + .append(", ").append(current.getVirtualCores()).append(", ") .append(pending.getMemorySize()).append(", ") .append(pending.getVirtualCores()).append(", ") .append(getGuaranteed().getMemorySize()).append(", ") @@ -267,9 +247,17 @@ public class TempQueuePerPartition { .append(idealAssigned.getMemorySize()).append(", ") .append(idealAssigned.getVirtualCores()).append(", ") .append(toBePreempted.getMemorySize()).append(", ") - .append(toBePreempted.getVirtualCores() ).append(", ") - .append(actuallyToBePreempted.getMemorySize()).append(", ") - .append(actuallyToBePreempted.getVirtualCores()); + .append(toBePreempted.getVirtualCores()).append(", ") + .append(getActuallyToBePreempted().getMemorySize()).append(", ") + .append(getActuallyToBePreempted().getVirtualCores()); + } + + public void addAllApps(Collection orderedApps) { + this.apps = orderedApps; + } + + public Collection getApps() { + return apps; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 6db5074f233..cea5aa46301 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1045,6 +1045,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur private static final String PREEMPTION_CONFIG_PREFIX = "yarn.resourcemanager.monitor.capacity.preemption."; + private static final String INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX = + "intra-queue-preemption."; + /** If true, run the policy but do not affect the cluster with preemption and * kill events. */ public static final String PREEMPTION_OBSERVE_ONLY = @@ -1098,4 +1101,32 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers"; public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS = false; + + /** + * For intra-queue preemption, priority/user-limit/fairness based selectors + * can help to preempt containers. + */ + public static final String INTRAQUEUE_PREEMPTION_ENABLED = + PREEMPTION_CONFIG_PREFIX + + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "enabled"; + public static final boolean DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED = false; + + /** + * For intra-queue preemption, consider those queues which are above used cap + * limit. + */ + public static final String INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD = + PREEMPTION_CONFIG_PREFIX + + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "minimum-threshold"; + public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD = + 0.5f; + + /** + * For intra-queue preemption, allowable maximum-preemptable limit per queue. + */ + public static final String INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT = + PREEMPTION_CONFIG_PREFIX + + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "max-allowable-limit"; + public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT = + 0.2f; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 20378bbcc8d..eef43c308eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -73,6 +73,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -1029,8 +1030,9 @@ public class LeafQueue extends AbstractCSQueue { Resource clusterResource, FiCaSchedulerApp application, String partition) { return getHeadroom(user, queueCurrentLimit, clusterResource, - computeUserLimit(application, clusterResource, user, partition, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition); + computeUserLimit(application.getUser(), clusterResource, user, + partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + partition); } private Resource getHeadroom(User user, @@ -1101,7 +1103,7 @@ public class LeafQueue extends AbstractCSQueue { // Compute user limit respect requested labels, // TODO, need consider headroom respect labels also Resource userLimit = - computeUserLimit(application, clusterResource, queueUser, + computeUserLimit(application.getUser(), clusterResource, queueUser, nodePartition, schedulingMode); setQueueResourceLimitsInfo(clusterResource); @@ -1139,7 +1141,7 @@ public class LeafQueue extends AbstractCSQueue { } @Lock(NoLock.class) - private Resource computeUserLimit(FiCaSchedulerApp application, + private Resource computeUserLimit(String userName, Resource clusterResource, User user, String nodePartition, SchedulingMode schedulingMode) { Resource partitionResource = labelManager.getResourceByLabel(nodePartition, @@ -1239,7 +1241,6 @@ public class LeafQueue extends AbstractCSQueue { minimumAllocation); if (LOG.isDebugEnabled()) { - String userName = application.getUser(); LOG.debug("User limit computation for " + userName + " in queue " + getQueueName() + " userLimitPercent=" + userLimit + @@ -1815,11 +1816,22 @@ public class LeafQueue extends AbstractCSQueue { /** * Obtain (read-only) collection of active applications. */ - public Collection getApplications() { + public synchronized Collection getApplications() { return Collections.unmodifiableCollection(orderingPolicy .getSchedulableEntities()); } + /** + * Obtain (read-only) collection of all applications. + */ + public synchronized Collection getAllApplications() { + Collection apps = new HashSet( + pendingOrderingPolicy.getSchedulableEntities()); + apps.addAll(orderingPolicy.getSchedulableEntities()); + + return Collections.unmodifiableCollection(apps); + } + // Consider the headroom for each user in the queue. // Total pending for the queue = // sum(for each user(min((user's headroom), sum(user's pending requests)))) @@ -1833,7 +1845,7 @@ public class LeafQueue extends AbstractCSQueue { if (!userNameToHeadroom.containsKey(userName)) { User user = getUser(userName); Resource headroom = Resources.subtract( - computeUserLimit(app, resources, user, partition, + computeUserLimit(app.getUser(), resources, user, partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), user.getUsed(partition)); // Make sure headroom is not negative. @@ -1851,6 +1863,16 @@ public class LeafQueue extends AbstractCSQueue { return pendingConsideringUserLimit; } + public synchronized Resource getUserLimitPerUser(String userName, + Resource resources, String partition) { + + // Check user resource limit + User user = getUser(userName); + + return computeUserLimit(userName, resources, user, partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + } + @Override public synchronized void collectSchedulerApplications( Collection apps) { @@ -1901,8 +1923,8 @@ public class LeafQueue extends AbstractCSQueue { } /** - * return all ignored partition exclusivity RMContainers in the LeafQueue, this - * will be used by preemption policy, and use of return + * @return all ignored partition exclusivity RMContainers in the LeafQueue, + * this will be used by preemption policy, and use of return * ignorePartitionExclusivityRMContainer should protected by LeafQueue * synchronized lock */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 38995ff660b..b7b47b0d958 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -312,6 +313,23 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return ret; } + public synchronized Map getTotalPendingRequestsPerPartition() { + + Map ret = new HashMap(); + Resource res = null; + for (Priority priority : appSchedulingInfo.getPriorities()) { + ResourceRequest rr = appSchedulingInfo.getResourceRequest(priority, "*"); + if ((res = ret.get(rr.getNodeLabelExpression())) == null) { + res = Resources.createResource(0, 0); + ret.put(rr.getNodeLabelExpression(), res); + } + + Resources.addTo(res, + Resources.multiply(rr.getCapability(), rr.getNumContainers())); + } + return ret; + } + public synchronized void markContainerForPreemption(ContainerId cont) { // ignore already completed containers if (liveContainers.containsKey(cont)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index e60c384b4dc..3c1d9f0571b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -63,11 +63,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeSet; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doAnswer; @@ -160,13 +163,17 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { mClock); } - private void mockContainers(String containersConfig, ApplicationAttemptId attemptId, - String queueName, List reservedContainers, - List liveContainers) { + private void mockContainers(String containersConfig, FiCaSchedulerApp app, + ApplicationAttemptId attemptId, String queueName, + List reservedContainers, List liveContainers) { int containerId = 1; int start = containersConfig.indexOf("=") + 1; int end = -1; + Resource used = Resource.newInstance(0, 0); + Resource pending = Resource.newInstance(0, 0); + Priority pri = Priority.newInstance(0); + while (start < containersConfig.length()) { while (start < containersConfig.length() && containersConfig.charAt(start) != '(') { @@ -188,41 +195,50 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { // now we found start/end, get container values String[] values = containersConfig.substring(start + 1, end).split(","); - if (values.length != 6) { + if (values.length < 6 || values.length > 8) { throw new IllegalArgumentException("Format to define container is:" - + "(priority,resource,host,expression,repeat,reserved)"); + + "(priority,resource,host,expression,repeat,reserved, pending)"); } - Priority pri = Priority.newInstance(Integer.valueOf(values[0])); + pri.setPriority(Integer.valueOf(values[0])); Resource res = parseResourceFromString(values[1]); NodeId host = NodeId.newInstance(values[2], 1); - String exp = values[3]; + String label = values[3]; + String userName = "user"; int repeat = Integer.valueOf(values[4]); boolean reserved = Boolean.valueOf(values[5]); + if (values.length >= 7) { + Resources.addTo(pending, parseResourceFromString(values[6])); + } + if (values.length == 8) { + userName = values[7]; + } for (int i = 0; i < repeat; i++) { Container c = mock(Container.class); + Resources.addTo(used, res); when(c.getResource()).thenReturn(res); when(c.getPriority()).thenReturn(pri); RMContainerImpl rmc = mock(RMContainerImpl.class); when(rmc.getAllocatedNode()).thenReturn(host); - when(rmc.getNodeLabelExpression()).thenReturn(exp); + when(rmc.getNodeLabelExpression()).thenReturn(label); when(rmc.getAllocatedResource()).thenReturn(res); when(rmc.getContainer()).thenReturn(c); when(rmc.getApplicationAttemptId()).thenReturn(attemptId); when(rmc.getQueueName()).thenReturn(queueName); - final ContainerId cId = ContainerId.newContainerId(attemptId, containerId); - when(rmc.getContainerId()).thenReturn( - cId); + final ContainerId cId = ContainerId.newContainerId(attemptId, + containerId); + when(rmc.getContainerId()).thenReturn(cId); doAnswer(new Answer() { @Override public Integer answer(InvocationOnMock invocation) throws Throwable { - return cId.compareTo(((RMContainer) invocation.getArguments()[0]) - .getContainerId()); + return cId.compareTo( + ((RMContainer) invocation.getArguments()[0]).getContainerId()); } }).when(rmc).compareTo(any(RMContainer.class)); if (containerId == 1) { when(rmc.isAMContainer()).thenReturn(true); + when(app.getAMResource(label)).thenReturn(res); } if (reserved) { @@ -237,25 +253,44 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { // If this is a non-exclusive allocation String partition = null; - if (exp.isEmpty() + if (label.isEmpty() && !(partition = nodeIdToSchedulerNodes.get(host).getPartition()) - .isEmpty()) { + .isEmpty()) { LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); - Map> ignoreExclusivityContainers = - queue.getIgnoreExclusivityRMContainers(); + Map> ignoreExclusivityContainers = queue + .getIgnoreExclusivityRMContainers(); if (!ignoreExclusivityContainers.containsKey(partition)) { ignoreExclusivityContainers.put(partition, new TreeSet()); } ignoreExclusivityContainers.get(partition).add(rmc); } - LOG.debug("add container to app=" + attemptId + " res=" + res - + " node=" + host + " nodeLabelExpression=" + exp + " partition=" + LOG.debug("add container to app=" + attemptId + " res=" + res + " node=" + + host + " nodeLabelExpression=" + label + " partition=" + partition); containerId++; } + // Some more app specific aggregated data can be better filled here. + when(app.getPriority()).thenReturn(pri); + when(app.getUser()).thenReturn(userName); + when(app.getCurrentConsumption()).thenReturn(used); + when(app.getCurrentReservation()) + .thenReturn(Resources.createResource(0, 0)); + + Map pendingForDefaultPartition = + new HashMap(); + // Add for default partition for now. + pendingForDefaultPartition.put(label, pending); + when(app.getTotalPendingRequestsPerPartition()) + .thenReturn(pendingForDefaultPartition); + + // need to set pending resource in resource usage as well + ResourceUsage ru = new ResourceUsage(); + ru.setUsed(label, used); + when(app.getAppAttemptResourceUsage()).thenReturn(ru); + start = end + 1; } } @@ -271,6 +306,8 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { */ private void mockApplications(String appsConfig) { int id = 1; + HashMap> userMap = new HashMap>(); + LeafQueue queue = null; for (String a : appsConfig.split(";")) { String[] strs = a.split("\t"); String queueName = strs[0]; @@ -279,24 +316,49 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { List liveContainers = new ArrayList(); List reservedContainers = new ArrayList(); ApplicationId appId = ApplicationId.newInstance(0L, id); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - - mockContainers(strs[1], appAttemptId, queueName, reservedContainers, - liveContainers); + ApplicationAttemptId appAttemptId = ApplicationAttemptId + .newInstance(appId, 1); FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); + when(app.getAMResource(anyString())) + .thenReturn(Resources.createResource(0, 0)); + mockContainers(strs[1], app, appAttemptId, queueName, reservedContainers, + liveContainers); + LOG.debug("Application mock: queue: " + queueName + ", appId:" + appId); + when(app.getLiveContainers()).thenReturn(liveContainers); when(app.getReservedContainers()).thenReturn(reservedContainers); when(app.getApplicationAttemptId()).thenReturn(appAttemptId); when(app.getApplicationId()).thenReturn(appId); - when(app.getPriority()).thenReturn(Priority.newInstance(0)); // add to LeafQueue - LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); + queue = (LeafQueue) nameToCSQueues.get(queueName); queue.getApplications().add(app); + queue.getAllApplications().add(app); + HashSet users = userMap.get(queueName); + if (null == users) { + users = new HashSet(); + userMap.put(queueName, users); + } + + users.add(app.getUser()); id++; } + + for (String queueName : userMap.keySet()) { + queue = (LeafQueue) nameToCSQueues.get(queueName); + // Currently we have user-limit test support only for default label. + Resource totResoucePerPartition = partitionToResource.get(""); + Resource capacity = Resources.multiply(totResoucePerPartition, + queue.getQueueCapacities().getAbsoluteCapacity()); + HashSet users = userMap.get(queue.getQueueName()); + Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size()); + for (String user : users) { + when(queue.getUserLimitPerUser(eq(user), any(Resource.class), + anyString())).thenReturn(userLimit); + } + } } private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container, @@ -430,10 +492,18 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { new Comparator() { @Override public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { - return a1.getApplicationId().compareTo(a2.getApplicationId()); + if (a1.getPriority() != null + && !a1.getPriority().equals(a2.getPriority())) { + return a1.getPriority().compareTo(a2.getPriority()); + } + + int res = a1.getApplicationId() + .compareTo(a2.getApplicationId()); + return res; } }); when(leafQueue.getApplications()).thenReturn(apps); + when(leafQueue.getAllApplications()).thenReturn(apps); OrderingPolicy so = mock(OrderingPolicy.class); when(so.getPreemptionIterator()).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { @@ -512,10 +582,15 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { float absUsed = Resources.divide(rc, totResoucePerPartition, parseResourceFromString(values[2].trim()), totResoucePerPartition) + epsilon; + float used = Resources.divide(rc, totResoucePerPartition, + parseResourceFromString(values[2].trim()), + parseResourceFromString(values[0].trim())) + epsilon; Resource pending = parseResourceFromString(values[3].trim()); qc.setAbsoluteCapacity(partitionName, absGuaranteed); qc.setAbsoluteMaximumCapacity(partitionName, absMax); qc.setAbsoluteUsedCapacity(partitionName, absUsed); + qc.setUsedCapacity(partitionName, used); + when(queue.getUsedCapacity()).thenReturn(used); ru.setPending(partitionName, pending); if (!isParent(queueExprArray, idx)) { LeafQueue lq = (LeafQueue) queue; @@ -530,6 +605,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { reserved = parseResourceFromString(values[4].trim()); ru.setReserved(partitionName, reserved); } + LOG.debug("Setup queue=" + queueName + " partition=" + partitionName + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax + ",abs_used" + absUsed + ",pending_resource=" + pending diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java new file mode 100644 index 00000000000..19fb0d293d0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java @@ -0,0 +1,868 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Test class for IntraQueuePreemption scenarios. + */ +public class TestProportionalCapacityPreemptionPolicyIntraQueue + extends + ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + conf.setBoolean( + CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + @Test + public void testSimpleIntraQueuePreemption() throws IOException { + /** + * The simplest test preemption, Queue structure is: + * + *
+     *       root
+     *     /  | | \
+     *    a  b  c  d
+     * 
+ * + * Guaranteed resource of a/b/c/d are 11:40:20:29 Total cluster resource = + * 100 + * Scenario: + * Queue B has few running apps and two high priority apps have demand. + * Apps which are running at low priority (4) will preempt few of its + * resources to meet the demand. + */ + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 80 120 0]);" + // root + "-a(=[11 100 11 50 0]);" + // a + "-b(=[40 100 38 60 0]);" + // b + "-c(=[20 100 10 10 0]);" + // c + "-d(=[29 100 20 0 0])"; // d + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved, + // pending) + "a\t" // app1 in a + + "(1,1,n1,,6,false,25);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,34,false,20);" + // app3 b + "b\t" // app4 in b + + "(4,1,n1,,2,false,10);" + // app4 b + "b\t" // app4 in b + + "(5,1,n1,,1,false,10);" + // app5 b + "b\t" // app4 in b + + "(6,1,n1,,1,false,10);" + // app6 in b + "c\t" // app1 in a + + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c + + "(1,1,n1,,20,false,0)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // For queue B, app3 and app4 were of lower priority. Hence take 8 + // containers from them by hitting the intraQueuePreemptionDemand of 20%. + verify(mDisp, times(1)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + verify(mDisp, times(7)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testNoPreemptionForSamePriorityApps() throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *     /  | | \
+     *    a  b  c  d
+     * 
+ * + * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource = + * 100 + * Scenario: In queue A/B, all apps are running at same priority. However + * there are many demands also from these apps. Since all apps are at same + * priority, preemption should not occur here. + */ + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 80 120 0]);" + // root + "-a(=[10 100 10 50 0]);" + // a + "-b(=[40 100 40 60 0]);" + // b + "-c(=[20 100 10 10 0]);" + // c + "-d(=[30 100 20 0 0])"; // d + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved, + // pending) + "a\t" // app1 in a + + "(1,1,n1,,6,false,25);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(1,1,n1,,34,false,20);" + // app3 b + "b\t" // app4 in b + + "(1,1,n1,,2,false,10);" + // app4 b + "b\t" // app4 in b + + "(1,1,n1,,1,false,20);" + // app5 b + "b\t" // app4 in b + + "(1,1,n1,,1,false,10);" + // app6 in b + "c\t" // app1 in a + + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c + + "(1,1,n1,,20,false,0)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // For queue B, none of the apps should be preempted. + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(5)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(6)))); + } + + @Test + public void testNoPreemptionWhenQueueIsUnderCapacityLimit() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *      /   \
+     *     a     b
+     * 
+ * + * Scenario: + * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 BY + * default, this limit is 50%. Test to verify that there wont be any + * preemption since used capacity is under 50% for queue a/b even though + * there are demands from high priority apps in queue. + */ + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 35 80 0]);" + // root + "-a(=[40 100 10 50 0]);" + // a + "-b(=[60 100 25 30 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,5,false,25);" + // app1 a + "a\t" // app2 in a + + "(2,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,40,false,20);" + // app3 b + "b\t" // app1 in a + + "(6,1,n1,,5,false,20)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // For queue A/B, none of the apps should be preempted as used capacity + // is under 50%. + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + + @Test + public void testLimitPreemptionWithMaxIntraQueuePreemptableLimit() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *      /   \
+     *     a     b
+     * 
+ * + * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 + * maxIntraQueuePreemptableLimit by default is 50%. This test is to verify + * that the maximum preemption should occur upto 50%, eventhough demand is + * more. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[40 100 10 50 0]);" + // a + "-b(=[60 100 45 120 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,5,false,25);" + // app1 a + "a\t" // app2 in a + + "(2,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,40,false,20);" + // app3 b + "b\t" // app1 in a + + "(6,1,n1,,5,false,100)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // For queueB, eventhough app4 needs 100 resources, only 30 resources were + // preempted. (max is 50% of guaranteed cap of any queue + // "maxIntraQueuePreemptable") + verify(mDisp, times(30)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testLimitPreemptionWithTotalPreemptedResourceAllowed() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *      /   \
+     *     a     b
+     * 
+ * + * Scenario: + * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 + * totalPreemption allowed is 10%. This test is to verify that only + * 10% is preempted. + */ + + // report "ideal" preempt as 10%. Ensure preemption happens only for 10% + conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, + (float) 0.1); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[40 100 10 50 0]);" + // a + "-b(=[60 100 45 120 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,5,false,25);" + // app1 a + "a\t" // app2 in a + + "(2,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,40,false,20);" + // app3 b + "b\t" // app1 in a + + "(6,1,n1,,5,false,100)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // For queue B eventhough app4 needs 100 resources, only 10 resources were + // preempted. This is the 10% limit of TOTAL_PREEMPTION_PER_ROUND. + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testAlreadySelectedContainerFromInterQueuePreemption() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *      /   \
+     *     a     b
+     * 
+ * + * Scenario: + * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 + * QueueB is under utilized and QueueA has to release 9 containers here. + * However within queue A, high priority app has also a demand for 20. + * So additional 11 more containers will be preempted making a tota of 20. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 95 170 0]);" + // root + "-a(=[60 100 70 50 0]);" + // a + "-b(=[40 100 25 120 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,50,false,15);" + // app1 a + "a\t" // app2 in a + + "(2,1,n1,,20,false,20);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,20,false,20);" + // app3 b + "b\t" // app1 in a + + "(4,1,n1,,5,false,100)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // As per intra queue preemption algorithm, 20 more containers were needed + // for app2 (in queue a). Inter queue pre-emption had already preselected 9 + // containers and hence preempted only 11 more. + verify(mDisp, times(20)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testSkipAMContainersInInterQueuePreemption() throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *      /   \
+     *     a     b
+     * 
+ * + * Scenario: + * Guaranteed resource of a/b are 60:40 Total cluster resource = 100 + * While preempting containers during intra-queue preemption, AM containers + * will be spared for now. Verify the same. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 170 0]);" + // root + "-a(=[60 100 60 50 0]);" + // a + "-b(=[40 100 40 120 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,30,false,10);" + "a\t" // app2 in a + + "(1,1,n1,,10,false,20);" + "a\t" // app3 in a + + "(2,1,n1,,20,false,20);" + "b\t" // app4 in b + + "(4,1,n1,,20,false,20);" + "b\t" // app5 in a + + "(4,1,n1,,20,false,100)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Ensure that only 9 containers are preempted from app2 (sparing 1 AM) + verify(mDisp, times(11)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testSkipAMContainersInInterQueuePreemptionSingleApp() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *      /   \
+     *     a     b
+     * 
+ * + * Scenario: + * Guaranteed resource of a/b are 50:50 Total cluster resource = 100 + * Spare Am container from a lower priority app during its preemption + * cycle. Eventhough there are more demand and no other low priority + * apps are present, still AM contaier need to soared. + */ + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 170 0]);" + // root + "-a(=[50 100 50 50 0]);" + // a + "-b(=[50 100 50 120 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,10,false,10);" + "a\t" // app1 in a + + "(2,1,n1,,40,false,10);" + "b\t" // app2 in a + + "(4,1,n1,,20,false,20);" + "b\t" // app3 in b + + "(4,1,n1,,30,false,100)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Make sure that app1's Am container is spared. Only 9/10 is preempted. + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testNoPreemptionForSingleApp() throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *      /   \
+     *     a     b
+     * 
+ * + * Scenario: + * Guaranteed resource of a/b are 60:40 Total cluster resource = 100 + * Only one app is running in queue. And it has more demand but no + * resource are available in queue. Preemption must not occur here. + */ + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 20 50 0]);" + // root + "-a(=[60 100 20 50 0]);" + // a + "-b(=[40 100 0 0 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(4,1,n1,,20,false,50)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Ensure there are 0 preemptions since only one app is running in queue. + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testOverutilizedQueueResourceWithInterQueuePreemption() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *      /   \
+     *     a     b
+     * 
+ * Scenario: + * Guaranteed resource of a/b are 20:80 Total cluster resource = 100 + * QueueB is under utilized and 20 resource will be released from queueA. + * 10 containers will also selected for intra-queue too but it will be + * pre-selected. + */ + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 70 0]);" + // root + "-a(=[20 100 100 30 0]);" + // a + "-b(=[80 100 0 20 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,50,false,0);" + "a\t" // app1 in a + + "(3,1,n1,,50,false,30);" + "b\t" // app2 in a + + "(4,1,n1,,0,false,20)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Complete demand request from QueueB for 20 resource must be preempted. + verify(mDisp, times(20)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testNodePartitionIntraQueuePreemption() throws IOException { + /** + * The simplest test of node label, Queue structure is: + * + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * + * Scenario: + * Both a/b can access x, and guaranteed capacity of them is 50:50. Two + * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster, + * app1/app2/app3 in a, and app4/app5 in b. app1 uses 50 x, app2 uses 50 + * NO_LABEL, app3 uses 50 x, app4 uses 50 NO_LABEL. a has 20 pending + * resource for x for app3 of priority 2 + * + * After preemption, it should preempt 20 from app1 + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;" + // default partition + "x=100,true"; // partition=x + String nodesConfig = "n1=x;" + // n1 has partition=x + "n2="; // n2 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100],x=[100 100 100 100]);" + // root + "-a(=[50 100 50 50],x=[50 100 50 50]);" + // a + "-b(=[50 100 50 50],x=[50 100 50 50])"; // b + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,x,50,false,10);" + // 50 * x in n1 + "a\t" // app2 in a + + "(2,1,n1,x,0,false,20);" + // 0 * x in n1 + "a\t" // app2 in a + + "(1,1,n2,,50,false);" + // 50 default in n2 + "b\t" // app3 in b + + "(1,1,n1,x,50,false);" + // 50 * x in n1 + "b\t" // app4 in b + + "(1,1,n2,,50,false)"; // 50 default in n2 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 20 preempted from app1 + verify(mDisp, times(20)) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + verify(mDisp, never()) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + verify(mDisp, never()) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(3)))); + } + + @Test + public void testComplexIntraQueuePreemption() throws IOException { + /** + * The complex test preemption, Queue structure is: + * + *
+     *       root
+     *     /  | | \
+     *    a  b  c  d
+     * 
+ * + * Scenario: + * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource = + * 100 + * All queues under its capacity, but within each queue there are many + * under served applications. + */ + + // report "ideal" preempt as 50%. Ensure preemption happens only for 50% + conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, + (float) 0.5); + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 75 130 0]);" + // root + "-a(=[10 100 5 50 0]);" + // a + "-b(=[40 100 35 60 0]);" + // b + "-c(=[20 100 10 10 0]);" + // c + "-d(=[30 100 25 10 0])"; // d + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved, + // pending) + "a\t" // app1 in a + + "(1,1,n1,,5,false,25);" + // app1 a + "a\t" + + "(4,1,n1,,0,false,25);" + // app2 a + "a\t" + + "(5,1,n1,,0,false,2);" + // app3 a + "b\t" + + "(3,1,n1,,5,false,20);" + // app4 b + "b\t" + + "(4,1,n1,,15,false,10);" + // app5 b + "b\t" + + "(4,1,n1,,10,false,10);" + // app6 b + "b\t" + + "(5,1,n1,,3,false,5);" + // app7 b + "b\t" + + "(5,1,n1,,0,false,2);" + // app8 b + "b\t" + + "(6,1,n1,,2,false,10);" + // app9 in b + "c\t" + + "(1,1,n1,,8,false,10);" + // app10 in c + "c\t" + + "(1,1,n1,,2,false,5);" + // app11 in c + "c\t" + + "(2,1,n1,,0,false,3);" + "d\t" // app12 in c + + "(2,1,n1,,25,false,0);" + "d\t" // app13 in d + + "(1,1,n1,,0,false,20)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // High priority app in queueA has 30 resource demand. But low priority + // app has only 5 resource. Hence preempt 4 here sparing AM. + verify(mDisp, times(4)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + // Multiple high priority apps has demand of 17. This will be preempted + // from another set of low priority apps. + verify(mDisp, times(4)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(6)))); + verify(mDisp, times(4)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(5)))); + // Only 3 resources will be freed in this round for queue C as we + // are trying to save AM container. + verify(mDisp, times(2)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(10)))); + verify(mDisp, times(1)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(11)))); + } + + @Test + public void testIntraQueuePreemptionWithTwoUsers() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *      /   \
+     *     a     b
+     * 
+ * + * Scenario: + * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 + * Consider 2 users in a queue, assume minimum user limit factor is 50%. + * Hence in queueB of 40, each use has a quota of 20. app4 of high priority + * has a demand of 30 and its already using 5. Adhering to userlimit only + * 15 more must be preempted. If its same user,20 would have been preempted + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 55 170 0]);" + // root + "-a(=[60 100 10 50 0]);" + // a + "-b(=[40 100 40 120 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,5,false,25);" + // app1 a + "a\t" // app2 in a + + "(2,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,35,false,20,user1);" + // app3 b + "b\t" // app4 in b + + "(6,1,n1,,5,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Considering user-limit of 50% since only 2 users are there, only preempt + // 15 more (5 is already running) eventhough demand is for 30. + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testComplexNodePartitionIntraQueuePreemption() + throws IOException { + /** + * The simplest test of node label, Queue structure is: + * + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * + * Scenario: + * Both a/b can access x, and guaranteed capacity of them is 50:50. Two + * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster, + * app1-app4 in a, and app5-app9 in b. + * + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;" + // default partition + "x=100,true"; // partition=x + String nodesConfig = "n1=x;" + // n1 has partition=x + "n2="; // n2 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100],x=[100 100 100 100]);" + // root + "-a(=[50 100 50 50],x=[50 100 40 50]);" + // a + "-b(=[50 100 35 50],x=[50 100 50 50])"; // b + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,x,35,false,10);" + // 20 * x in n1 + "a\t" // app2 in a + + "(1,1,n1,x,5,false,10);" + // 20 * x in n1 + "a\t" // app3 in a + + "(2,1,n1,x,0,false,20);" + // 0 * x in n1 + "a\t" // app4 in a + + "(1,1,n2,,50,false);" + // 50 default in n2 + "b\t" // app5 in b + + "(1,1,n1,x,50,false);" + // 50 * x in n1 + "b\t" // app6 in b + + "(1,1,n2,,25,false);" + // 25 * default in n2 + "b\t" // app7 in b + + "(1,1,n2,,3,false);" + // 3 * default in n2 + "b\t" // app8 in b + + "(1,1,n2,,2,false);" + // 2 * default in n2 + "b\t" // app9 in b + + "(5,1,n2,,5,false,30)"; // 50 default in n2 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Label X: app3 has demand of 20 for label X. Hence app2 will loose + // 4 (sparing AM) and 16 more from app1 till preemption limit is met. + verify(mDisp, times(16)) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + verify(mDisp, times(4)) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + + // Default Label:For a demand of 30, preempt from all low priority + // apps of default label. 25 will be preempted as preemption limit is + // met. + verify(mDisp, times(1)) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(8)))); + verify(mDisp, times(2)) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(7)))); + verify(mDisp, times(22)) + .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6)))); + } +}