From c583ab02c730be0a63d974039a78f2dc67dc2db6 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Mon, 22 May 2017 14:26:13 -0700 Subject: [PATCH] YARN-2113. Add cross-user preemption within CapacityScheduler's leaf-queue. (Sunil G via wangda) Change-Id: I9b19f69788068be05b3295247cdd7b972f8a573c --- .../resource/DefaultResourceCalculator.java | 5 + .../resource/DominantResourceCalculator.java | 5 + .../util/resource/ResourceCalculator.java | 9 + .../hadoop/yarn/util/resource/Resources.java | 5 + .../CapacitySchedulerPreemptionContext.java | 5 + .../CapacitySchedulerPreemptionUtils.java | 9 +- .../FifoIntraQueuePreemptionPlugin.java | 323 +++++-- .../IntraQueueCandidatesSelector.java | 112 ++- .../IntraQueuePreemptionComputePlugin.java | 10 +- .../ProportionalCapacityPreemptionPolicy.java | 25 +- .../monitor/capacity/TempAppPerPartition.java | 6 +- .../capacity/TempQueuePerPartition.java | 14 + .../capacity/TempUserPerPartition.java | 88 ++ .../CapacitySchedulerConfiguration.java | 8 + .../scheduler/capacity/LeafQueue.java | 11 +- .../scheduler/capacity/UsersManager.java | 11 +- ...CapacityPreemptionPolicyMockFramework.java | 89 +- ...nalCapacityPreemptionPolicyIntraQueue.java | 30 +- ...tyPreemptionPolicyIntraQueueUserLimit.java | 899 ++++++++++++++++++ ...cityPreemptionPolicyIntraQueueWithDRF.java | 178 ++++ 20 files changed, 1686 insertions(+), 156 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java index ef7229c6227..524a04945ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java @@ -121,4 +121,9 @@ public class DefaultResourceCalculator extends ResourceCalculator { Resource smaller, Resource bigger) { return smaller.getMemorySize() <= bigger.getMemorySize(); } + + @Override + public boolean isAnyMajorResourceZero(Resource resource) { + return resource.getMemorySize() == 0f; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 032aa020fd6..69fe716c96e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -239,4 +239,9 @@ public class DominantResourceCalculator extends ResourceCalculator { return smaller.getMemorySize() <= bigger.getMemorySize() && smaller.getVirtualCores() <= bigger.getVirtualCores(); } + + @Override + public boolean isAnyMajorResourceZero(Resource resource) { + return resource.getMemorySize() == 0f || resource.getVirtualCores() == 0; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java index a2f85b314cc..d219fe10593 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java @@ -204,4 +204,13 @@ public abstract class ResourceCalculator { */ public abstract boolean fitsIn(Resource cluster, Resource smaller, Resource bigger); + + /** + * Check if resource has any major resource types (which are all NodeManagers + * included) a zero value. + * + * @param resource resource + * @return returns true if any resource is zero. + */ + public abstract boolean isAnyMajorResourceZero(Resource resource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 702030030ce..68db19490d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -352,4 +352,9 @@ public class Resources { return createResource(Math.max(lhs.getMemorySize(), rhs.getMemorySize()), Math.max(lhs.getVirtualCores(), rhs.getVirtualCores())); } + + public static boolean isAnyMajorResourceZero(ResourceCalculator rc, + Resource resource) { + return rc.isAnyMajorResourceZero(resource); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java index 982b1f15e7d..d6f3f6c6d42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java @@ -18,9 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -63,4 +65,7 @@ interface CapacitySchedulerPreemptionContext { float getMinimumThresholdForIntraQueuePreemption(); float getMaxAllowableLimitForIntraQueuePreemption(); + + @Unstable + IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java index abad2a149d7..0ae3ef01340 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java @@ -99,7 +99,7 @@ public class CapacitySchedulerPreemptionUtils { } deductPreemptableResourcePerApp(context, tq.totalPartitionResource, - tas, res, partition); + tas, res); } } } @@ -108,10 +108,10 @@ public class CapacitySchedulerPreemptionUtils { private static void deductPreemptableResourcePerApp( CapacitySchedulerPreemptionContext context, Resource totalPartitionResource, Collection tas, - Resource res, String partition) { + Resource res) { for (TempAppPerPartition ta : tas) { ta.deductActuallyToBePreempted(context.getResourceCalculator(), - totalPartitionResource, res, partition); + totalPartitionResource, res); } } @@ -157,7 +157,8 @@ public class CapacitySchedulerPreemptionUtils { && Resources.greaterThan(rc, clusterResource, toObtainByPartition, Resources.none()) && Resources.fitsIn(rc, clusterResource, - rmContainer.getAllocatedResource(), totalPreemptionAllowed)) { + rmContainer.getAllocatedResource(), totalPreemptionAllowed) + && !Resources.isAnyMajorResourceZero(rc, toObtainByPartition)) { Resources.subtractFrom(toObtainByPartition, rmContainer.getAllocatedResource()); Resources.subtractFrom(totalPreemptionAllowed, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 5f1af1e8e39..4bf6760287d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -18,11 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; @@ -33,7 +35,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -60,6 +64,26 @@ public class FifoIntraQueuePreemptionPlugin this.rc = rc; } + @Override + public Collection getPreemptableApps(String queueName, + String partition) { + TempQueuePerPartition tq = context.getQueueByPartition(queueName, + partition); + + List apps = new ArrayList(); + for (TempAppPerPartition tmpApp : tq.getApps()) { + // If a lower priority app was not selected to get preempted, mark such + // apps out from preemption candidate selection. + if (Resources.equals(tmpApp.getActuallyToBePreempted(), + Resources.none())) { + continue; + } + + apps.add(tmpApp.app); + } + return apps; + } + @Override public Map getResourceDemandFromAppsPerQueue( String queueName, String partition) { @@ -90,7 +114,7 @@ public class FifoIntraQueuePreemptionPlugin @Override public void computeAppsIdealAllocation(Resource clusterResource, - Resource partitionBasedResource, TempQueuePerPartition tq, + TempQueuePerPartition tq, Map> selectedCandidates, Resource totalPreemptedResourceAllowed, Resource queueReassignableResource, float maxAllowablePreemptLimit) { @@ -113,17 +137,15 @@ public class FifoIntraQueuePreemptionPlugin // 3. Create all tempApps for internal calculation and return a list from // high priority to low priority order. - TAPriorityComparator taComparator = new TAPriorityComparator(); - PriorityQueue orderedByPriority = - createTempAppForResCalculation(tq.partition, apps, taComparator); + PriorityQueue orderedByPriority = createTempAppForResCalculation( + tq, apps, clusterResource, perUserAMUsed); // 4. Calculate idealAssigned per app by checking based on queue's // unallocated resource.Also return apps arranged from lower priority to // higher priority. - TreeSet orderedApps = - calculateIdealAssignedResourcePerApp(clusterResource, - partitionBasedResource, tq, selectedCandidates, - queueReassignableResource, orderedByPriority, perUserAMUsed); + TreeSet orderedApps = calculateIdealAssignedResourcePerApp( + clusterResource, tq, selectedCandidates, queueReassignableResource, + orderedByPriority); // 5. A configurable limit that could define an ideal allowable preemption // limit. Based on current queue's capacity,defined how much % could become @@ -146,7 +168,7 @@ public class FifoIntraQueuePreemptionPlugin // 7. From lowest priority app onwards, calculate toBePreempted resource // based on demand. calculateToBePreemptedResourcePerApp(clusterResource, orderedApps, - preemptionLimit); + Resources.clone(preemptionLimit)); // Save all apps (low to high) to temp queue for further reference tq.addAllApps(orderedApps); @@ -154,7 +176,8 @@ public class FifoIntraQueuePreemptionPlugin // 8. There are chances that we may preempt for the demand from same // priority level, such cases are to be validated out. validateOutSameAppPriorityFromDemand(clusterResource, - (TreeSet) tq.getApps()); + (TreeSet) orderedApps, tq.getUsersPerPartition(), + context.getIntraQueuePreemptionOrderPolicy()); if (LOG.isDebugEnabled()) { LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition); @@ -177,17 +200,17 @@ public class FifoIntraQueuePreemptionPlugin Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(), tmpApp.idealAssigned); - Resources.subtractFrom(preemtableFromApp, tmpApp.selected); - Resources.subtractFrom(preemtableFromApp, tmpApp.getAMUsed()); + Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.selected); + Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.getAMUsed()); // Calculate toBePreempted from apps as follows: // app.preemptable = min(max(app.used - app.selected - app.ideal, 0), // intra_q_preemptable) tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources .max(rc, clusterResource, preemtableFromApp, Resources.none()), - preemptionLimit); + Resources.clone(preemptionLimit)); - preemptionLimit = Resources.subtract(preemptionLimit, + preemptionLimit = Resources.subtractFromNonNegative(preemptionLimit, tmpApp.toBePreempted); } } @@ -222,31 +245,24 @@ public class FifoIntraQueuePreemptionPlugin * } * * @param clusterResource Cluster Resource - * @param partitionBasedResource resource per partition * @param tq TempQueue * @param selectedCandidates Already Selected preemption candidates * @param queueReassignableResource Resource used in a queue * @param orderedByPriority List of running apps - * @param perUserAMUsed AM used resource * @return List of temp apps ordered from low to high priority */ private TreeSet calculateIdealAssignedResourcePerApp( - Resource clusterResource, Resource partitionBasedResource, - TempQueuePerPartition tq, + Resource clusterResource, TempQueuePerPartition tq, Map> selectedCandidates, Resource queueReassignableResource, - PriorityQueue orderedByPriority, - Map perUserAMUsed) { + PriorityQueue orderedByPriority) { Comparator reverseComp = Collections .reverseOrder(new TAPriorityComparator()); TreeSet orderedApps = new TreeSet<>(reverseComp); - Map userIdealAssignedMapping = new HashMap<>(); String partition = tq.partition; - - Map preCalculatedUserLimit = - new HashMap(); + Map usersPerPartition = tq.getUsersPerPartition(); while (!orderedByPriority.isEmpty()) { // Remove app from the next highest remaining priority and process it to @@ -256,44 +272,19 @@ public class FifoIntraQueuePreemptionPlugin // Once unallocated resource is 0, we can stop assigning ideal per app. if (Resources.lessThanOrEqual(rc, clusterResource, - queueReassignableResource, Resources.none())) { + queueReassignableResource, Resources.none()) + || Resources.isAnyMajorResourceZero(rc, queueReassignableResource)) { continue; } String userName = tmpApp.app.getUser(); - Resource userLimitResource = preCalculatedUserLimit.get(userName); - - // Verify whether we already calculated headroom for this user. - if (userLimitResource == null) { - userLimitResource = Resources.clone( - tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource, - partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); - - Resource amUsed = perUserAMUsed.get(userName); - if (null == amUsed) { - amUsed = Resources.createResource(0, 0); - } - - // Real AM used need not have to be considered for user-limit as well. - userLimitResource = Resources.subtract(userLimitResource, amUsed); - if (LOG.isDebugEnabled()) { - LOG.debug("Userlimit for user '" + userName + "' is :" - + userLimitResource + ", and amUsed is:" + amUsed); - } - - preCalculatedUserLimit.put(userName, userLimitResource); - } - - Resource idealAssignedForUser = userIdealAssignedMapping.get(userName); - - if (idealAssignedForUser == null) { - idealAssignedForUser = Resources.createResource(0, 0); - userIdealAssignedMapping.put(userName, idealAssignedForUser); - } + TempUserPerPartition tmpUser = usersPerPartition.get(userName); + Resource userLimitResource = tmpUser.getUserLimit(); + Resource idealAssignedForUser = tmpUser.idealAssigned; // Calculate total selected container resources from current app. - getAlreadySelectedPreemptionCandidatesResource(selectedCandidates, - tmpApp, partition); + getAlreadySelectedPreemptionCandidatesResource(selectedCandidates, tmpApp, + tmpUser, partition); // For any app, used+pending will give its idealAssigned. However it will // be tightly linked to queue's unallocated quota. So lower priority apps @@ -304,10 +295,11 @@ public class FifoIntraQueuePreemptionPlugin if (Resources.lessThan(rc, clusterResource, idealAssignedForUser, userLimitResource)) { - appIdealAssigned = Resources.min(rc, clusterResource, appIdealAssigned, + Resource idealAssigned = Resources.min(rc, clusterResource, + appIdealAssigned, Resources.subtract(userLimitResource, idealAssignedForUser)); tmpApp.idealAssigned = Resources.clone(Resources.min(rc, - clusterResource, queueReassignableResource, appIdealAssigned)); + clusterResource, queueReassignableResource, idealAssigned)); Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned); } else { continue; @@ -322,7 +314,8 @@ public class FifoIntraQueuePreemptionPlugin Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected)); } - Resources.subtractFrom(queueReassignableResource, tmpApp.idealAssigned); + Resources.subtractFromNonNegative(queueReassignableResource, + tmpApp.idealAssigned); } return orderedApps; @@ -334,7 +327,8 @@ public class FifoIntraQueuePreemptionPlugin */ private void getAlreadySelectedPreemptionCandidatesResource( Map> selectedCandidates, - TempAppPerPartition tmpApp, String partition) { + TempAppPerPartition tmpApp, TempUserPerPartition tmpUser, + String partition) { tmpApp.selected = Resources.createResource(0, 0); Set containers = selectedCandidates .get(tmpApp.app.getApplicationAttemptId()); @@ -346,16 +340,23 @@ public class FifoIntraQueuePreemptionPlugin for (RMContainer cont : containers) { if (partition.equals(cont.getNodeLabelExpression())) { Resources.addTo(tmpApp.selected, cont.getAllocatedResource()); + Resources.addTo(tmpUser.selected, cont.getAllocatedResource()); } } } private PriorityQueue createTempAppForResCalculation( - String partition, Collection apps, - TAPriorityComparator taComparator) { + TempQueuePerPartition tq, Collection apps, + Resource clusterResource, + Map perUserAMUsed) { + TAPriorityComparator taComparator = new TAPriorityComparator(); PriorityQueue orderedByPriority = new PriorityQueue<>( 100, taComparator); + String partition = tq.partition; + Map usersPerPartition = tq + .getUsersPerPartition(); + // have an internal temp app structure to store intermediate data(priority) for (FiCaSchedulerApp app : apps) { @@ -387,56 +388,156 @@ public class FifoIntraQueuePreemptionPlugin tmpApp.idealAssigned = Resources.createResource(0, 0); orderedByPriority.add(tmpApp); + + // Create a TempUserPerPartition structure to hold more information + // regarding each user's entities such as UserLimit etc. This could + // be kept in a user to TempUserPerPartition map for further reference. + String userName = app.getUser(); + if (!usersPerPartition.containsKey(userName)) { + ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName) + .getResourceUsage(); + + TempUserPerPartition tmpUser = new TempUserPerPartition( + tq.leafQueue.getUser(userName), tq.queueName, + Resources.clone(userResourceUsage.getUsed(partition)), + Resources.clone(perUserAMUsed.get(userName)), + Resources.clone(userResourceUsage.getReserved(partition)), + Resources.none()); + + Resource userLimitResource = Resources.clone( + tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource, + partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); + + // Real AM used need not have to be considered for user-limit as well. + userLimitResource = Resources.subtract(userLimitResource, + tmpUser.amUsed); + tmpUser.setUserLimit(userLimitResource); + + if (LOG.isDebugEnabled()) { + LOG.debug("TempUser:" + tmpUser); + } + + tmpUser.idealAssigned = Resources.createResource(0, 0); + tq.addUserPerPartition(userName, tmpUser); + } } return orderedByPriority; } /* * Fifo+Priority based preemption policy need not have to preempt resources at - * same priority level. Such cases will be validated out. + * same priority level. Such cases will be validated out. But if the demand is + * from an app of different user, force to preempt resources even if apps are + * at same priority. */ public void validateOutSameAppPriorityFromDemand(Resource cluster, - TreeSet appsOrderedfromLowerPriority) { + TreeSet orderedApps, + Map usersPerPartition, + IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrder) { - TempAppPerPartition[] apps = appsOrderedfromLowerPriority - .toArray(new TempAppPerPartition[appsOrderedfromLowerPriority.size()]); + TempAppPerPartition[] apps = orderedApps + .toArray(new TempAppPerPartition[orderedApps.size()]); if (apps.length <= 0) { return; } - int lPriority = 0; - int hPriority = apps.length - 1; + for (int hPriority = apps.length - 1; hPriority >= 0; hPriority--) { - while (lPriority < hPriority - && !apps[lPriority].equals(apps[hPriority]) - && apps[lPriority].getPriority() < apps[hPriority].getPriority()) { - Resource toPreemptFromOther = apps[hPriority] - .getToBePreemptFromOther(); - Resource actuallyToPreempt = apps[lPriority].getActuallyToBePreempted(); - Resource delta = Resources.subtract(apps[lPriority].toBePreempted, - actuallyToPreempt); + // Check whether high priority app with demand needs resource from other + // user. + if (Resources.greaterThan(rc, cluster, + apps[hPriority].getToBePreemptFromOther(), Resources.none())) { - if (Resources.greaterThan(rc, cluster, delta, Resources.none())) { - Resource toPreempt = Resources.min(rc, cluster, - toPreemptFromOther, delta); + // Given we have a demand from a high priority app, we can do a reverse + // scan from lower priority apps to select resources. + // Since idealAssigned of each app has considered user-limit, this logic + // will provide eventual consistency w.r.t user-limit as well. + for (int lPriority = 0; lPriority < apps.length; lPriority++) { - apps[hPriority].setToBePreemptFromOther( - Resources.subtract(toPreemptFromOther, toPreempt)); - apps[lPriority].setActuallyToBePreempted( - Resources.add(actuallyToPreempt, toPreempt)); - } + // Check whether app with demand needs resource from other user. + if (Resources.greaterThan(rc, cluster, apps[lPriority].toBePreempted, + Resources.none())) { - if (Resources.lessThanOrEqual(rc, cluster, - apps[lPriority].toBePreempted, - apps[lPriority].getActuallyToBePreempted())) { - lPriority++; - continue; - } + // If apps are of same user, and priority is same, then skip. + if ((apps[hPriority].getUser().equals(apps[lPriority].getUser())) + && (apps[lPriority].getPriority() >= apps[hPriority] + .getPriority())) { + continue; + } - if (Resources.equals(apps[hPriority].getToBePreemptFromOther(), - Resources.none())) { - hPriority--; - continue; + if (Resources.lessThanOrEqual(rc, cluster, + apps[lPriority].toBePreempted, + apps[lPriority].getActuallyToBePreempted()) + || Resources.equals(apps[hPriority].getToBePreemptFromOther(), + Resources.none())) { + continue; + } + + // Ideally if any application has a higher priority, then it can + // force to preempt any lower priority app from any user. However + // if admin enforces user-limit over priority, preemption module + // will not choose lower priority apps from usre's who are not yet + // met its user-limit. + TempUserPerPartition tmpUser = usersPerPartition + .get(apps[lPriority].getUser()); + if ((!apps[hPriority].getUser().equals(apps[lPriority].getUser())) + && (!tmpUser.isUserLimitReached(rc, cluster)) + && (intraQueuePreemptionOrder + .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST))) { + continue; + } + + Resource toPreemptFromOther = apps[hPriority] + .getToBePreemptFromOther(); + Resource actuallyToPreempt = apps[lPriority] + .getActuallyToBePreempted(); + + // A lower priority app could offer more resource to preempt, if + // multiple higher priority/under served users needs resources. + // After one iteration, we need to ensure that actuallyToPreempt is + // subtracted from the resource to preempt. + Resource preemptableFromLowerPriorityApp = Resources + .subtract(apps[lPriority].toBePreempted, actuallyToPreempt); + + // In case of user-limit preemption, when app's are from different + // user and of same priority, we will do user-limit preemption if + // there is a demand from under UL quota app. + // However this under UL quota app's demand may be more. + // Still we should ensure that we are not doing over preemption such + // that only a maximum of (user's used - UL quota) could be + // preempted. + if ((!apps[hPriority].getUser().equals(apps[lPriority].getUser())) + && (apps[lPriority].getPriority() == apps[hPriority] + .getPriority()) + && tmpUser.isUserLimitReached(rc, cluster)) { + + Resource deltaULQuota = Resources + .subtract(tmpUser.getUsedDeductAM(), tmpUser.selected); + Resources.subtractFrom(deltaULQuota, tmpUser.getUserLimit()); + + if (tmpUser.isPreemptionQuotaForULDeltaDone()) { + deltaULQuota = Resources.createResource(0, 0); + } + + if (Resources.lessThan(rc, cluster, deltaULQuota, + preemptableFromLowerPriorityApp)) { + tmpUser.updatePreemptionQuotaForULDeltaAsDone(true); + preemptableFromLowerPriorityApp = deltaULQuota; + } + } + + if (Resources.greaterThan(rc, cluster, + preemptableFromLowerPriorityApp, Resources.none())) { + Resource toPreempt = Resources.min(rc, cluster, + toPreemptFromOther, preemptableFromLowerPriorityApp); + + apps[hPriority].setToBePreemptFromOther( + Resources.subtract(toPreemptFromOther, toPreempt)); + apps[lPriority].setActuallyToBePreempted( + Resources.add(actuallyToPreempt, toPreempt)); + } + } + } } } } @@ -456,6 +557,40 @@ public class FifoIntraQueuePreemptionPlugin Resources.addTo(userAMResource, app.getAMResource(partition)); Resources.addTo(amUsed, app.getAMResource(partition)); } + return amUsed; } + + @Override + public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, + Resource clusterResource, Resource usedResource, RMContainer c) { + // Ensure below checks + // 1. This check must be done only when preemption order is USERLIMIT_FIRST + // 2. By selecting container "c", check whether this user's resource usage + // is going below its user-limit. + // 3. Used resource of user must be always greater than user-limit to + // skip some containers as per this check. If used resource is under user + // limit, then these containers of this user has to be preempted as demand + // might be due to high priority apps running in same user. + String partition = context.getScheduler() + .getSchedulerNode(c.getAllocatedNode()).getPartition(); + TempQueuePerPartition tq = context.getQueueByPartition(app.getQueueName(), + partition); + TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser()); + + // Given user is not present, skip the check. + if (tmpUser == null) { + return false; + } + + // For ideal resource computations, user-limit got saved by subtracting am + // used resource in TempUser. Hence it has to be added back here for + // complete check. + Resource userLimit = Resources.add(tmpUser.getUserLimit(), tmpUser.amUsed); + + return Resources.lessThanOrEqual(rc, clusterResource, + Resources.subtract(usedResource, c.getAllocatedResource()), userLimit) + && context.getIntraQueuePreemptionOrderPolicy() + .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 289041499d7..44fa736945d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; @@ -31,8 +32,9 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; -import java.util.Iterator; +import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -51,14 +53,14 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { Comparator { @Override - public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) { - Priority p1 = Priority.newInstance(tq1.getPriority()); - Priority p2 = Priority.newInstance(tq2.getPriority()); + public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { + Priority p1 = Priority.newInstance(ta1.getPriority()); + Priority p2 = Priority.newInstance(ta2.getPriority()); if (!p1.equals(p2)) { return p1.compareTo(p2); } - return tq1.getApplicationId().compareTo(tq2.getApplicationId()); + return ta1.getApplicationId().compareTo(ta2.getApplicationId()); } } @@ -121,17 +123,27 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { Map resToObtainByPartition = fifoPreemptionComputePlugin .getResourceDemandFromAppsPerQueue(queueName, partition); - // 6. Based on the selected resource demand per partition, select + // Default preemption iterator considers only FIFO+priority. For + // userlimit preemption, its possible that some lower priority apps + // needs from high priority app of another user. Hence use apps + // ordered by userlimit starvation as well. + Collection apps = fifoPreemptionComputePlugin + .getPreemptableApps(queueName, partition); + + // 6. Get user-limit to ensure that we do not preempt resources which + // will force user's resource to come under its UL. + Map rollingResourceUsagePerUser = new HashMap<>(); + initializeUsageAndUserLimitForCompute(clusterResource, partition, + leafQueue, rollingResourceUsagePerUser); + + // 7. Based on the selected resource demand per partition, select // containers with known policy from inter-queue preemption. try { leafQueue.getReadLock().lock(); - Iterator desc = leafQueue.getOrderingPolicy() - .getPreemptionIterator(); - while (desc.hasNext()) { - FiCaSchedulerApp app = desc.next(); - preemptFromLeastStarvedApp(selectedCandidates, clusterResource, - totalPreemptedResourceAllowed, resToObtainByPartition, - leafQueue, app); + for (FiCaSchedulerApp app : apps) { + preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates, + clusterResource, totalPreemptedResourceAllowed, + resToObtainByPartition, rollingResourceUsagePerUser); } } finally { leafQueue.getReadLock().unlock(); @@ -142,16 +154,30 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { return selectedCandidates; } - private void preemptFromLeastStarvedApp( + private void initializeUsageAndUserLimitForCompute(Resource clusterResource, + String partition, LeafQueue leafQueue, + Map rollingResourceUsagePerUser) { + for (String user : leafQueue.getAllUsers()) { + // Initialize used resource of a given user for rolling computation. + rollingResourceUsagePerUser.put(user, Resources.clone( + leafQueue.getUser(user).getResourceUsage().getUsed(partition))); + if (LOG.isDebugEnabled()) { + LOG.debug("Rolling resource usage for user:" + user + " is : " + + rollingResourceUsagePerUser.get(user)); + } + } + } + + private void preemptFromLeastStarvedApp(LeafQueue leafQueue, + FiCaSchedulerApp app, Map> selectedCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed, - Map resToObtainByPartition, LeafQueue leafQueue, - FiCaSchedulerApp app) { + Map resToObtainByPartition, + Map rollingResourceUsagePerUser) { // ToDo: Reuse reservation selector here. - List liveContainers = new ArrayList<>( - app.getLiveContainers()); + List liveContainers = new ArrayList<>(app.getLiveContainers()); sortContainers(liveContainers); if (LOG.isDebugEnabled()) { @@ -160,6 +186,8 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { + totalPreemptedResourceAllowed); } + Resource rollingUsedResourcePerUser = rollingResourceUsagePerUser + .get(app.getUser()); for (RMContainer c : liveContainers) { // if there are no demand, return. @@ -184,12 +212,34 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { continue; } - // Try to preempt this container - CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( - rc, preemptionContext, resToObtainByPartition, c, clusterResource, - selectedCandidates, totalPreemptedResourceAllowed); - } + // If selected container brings down resource usage under its user's + // UserLimit (or equals to), we must skip such containers. + if (fifoPreemptionComputePlugin.skipContainerBasedOnIntraQueuePolicy(app, + clusterResource, rollingUsedResourcePerUser, c)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Skipping container: " + c.getContainerId() + " with resource:" + + c.getAllocatedResource() + " as UserLimit for user:" + + app.getUser() + " with resource usage: " + + rollingUsedResourcePerUser + " is going under UL"); + } + break; + } + // Try to preempt this container + boolean ret = CapacitySchedulerPreemptionUtils + .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, + resToObtainByPartition, c, clusterResource, selectedCandidates, + totalPreemptedResourceAllowed); + + // Subtract from respective user's resource usage once a container is + // selected for preemption. + if (ret && preemptionContext.getIntraQueuePreemptionOrderPolicy() + .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) { + Resources.subtractFrom(rollingUsedResourcePerUser, + c.getAllocatedResource()); + } + } } private void computeIntraQueuePreemptionDemand(Resource clusterResource, @@ -205,12 +255,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { continue; } - // 2. Its better to get partition based resource limit earlier before - // starting calculation - Resource partitionBasedResource = - context.getPartitionResource(partition); - - // 3. loop through all queues corresponding to a partition. + // 2. loop through all queues corresponding to a partition. for (String queueName : queueNames) { TempQueuePerPartition tq = context.getQueueByPartition(queueName, partition); @@ -221,23 +266,22 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { continue; } - // 4. Consider reassignableResource as (used - actuallyToBePreempted). + // 3. Consider reassignableResource as (used - actuallyToBePreempted). // This provides as upper limit to split apps quota in a queue. Resource queueReassignableResource = Resources.subtract(tq.getUsed(), tq.getActuallyToBePreempted()); - // 5. Check queue's used capacity. Make sure that the used capacity is + // 4. Check queue's used capacity. Make sure that the used capacity is // above certain limit to consider for intra queue preemption. if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context .getMinimumThresholdForIntraQueuePreemption()) { continue; } - // 6. compute the allocation of all apps based on queue's unallocated + // 5. compute the allocation of all apps based on queue's unallocated // capacity fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource, - partitionBasedResource, tq, selectedCandidates, - totalPreemptedResourceAllowed, + tq, selectedCandidates, totalPreemptedResourceAllowed, queueReassignableResource, context.getMaxAllowableLimitForIntraQueuePreemption()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java index 93ebe65049d..56fd007d40b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java @@ -18,12 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import java.util.Collection; import java.util.Map; import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; interface IntraQueuePreemptionComputePlugin { @@ -32,8 +34,14 @@ interface IntraQueuePreemptionComputePlugin { String partition); void computeAppsIdealAllocation(Resource clusterResource, - Resource partitionBasedResource, TempQueuePerPartition tq, + TempQueuePerPartition tq, Map> selectedCandidates, Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned, float maxAllowablePreemptLimit); + + Collection getPreemptableApps(String queueName, + String partition); + + boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app, + Resource clusterResource, Resource usedResource, RMContainer c); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index dc6f1c205da..76d6637a3fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -79,6 +80,16 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; */ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolicy, CapacitySchedulerPreemptionContext { + + /** + * IntraQueuePreemptionOrder will be used to define various priority orders + * which could be configured by admin. + */ + @Unstable + public enum IntraQueuePreemptionOrderPolicy { + PRIORITY_FIRST, USERLIMIT_FIRST; + } + private static final Log LOG = LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class); @@ -95,6 +106,7 @@ public class ProportionalCapacityPreemptionPolicy private float maxAllowableLimitForIntraQueuePreemption; private float minimumThresholdForIntraQueuePreemption; + private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy; // Pointer to other RM components private RMContext rmContext; @@ -190,6 +202,13 @@ public class ProportionalCapacityPreemptionPolicy CapacitySchedulerConfiguration. DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD); + intraQueuePreemptionOrderPolicy = IntraQueuePreemptionOrderPolicy + .valueOf(csConfig + .get( + CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY) + .toUpperCase()); + rc = scheduler.getResourceCalculator(); nlm = scheduler.getRMContext().getNodeLabelManager(); @@ -242,7 +261,6 @@ public class ProportionalCapacityPreemptionPolicy } } - @SuppressWarnings("unchecked") private void preemptOrkillSelectedContainerAfterWait( Map> selectedCandidates, long currentTime) { @@ -652,4 +670,9 @@ public class ProportionalCapacityPreemptionPolicy } underServedQueues.add(queueName); } + + @Override + public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() { + return intraQueuePreemptionOrderPolicy; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java index fccd2a74732..e9a934b8403 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java @@ -91,8 +91,12 @@ public class TempAppPerPartition extends AbstractPreemptionEntity { return applicationId; } + public String getUser() { + return this.app.getUser(); + } + public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator, - Resource cluster, Resource toBeDeduct, String partition) { + Resource cluster, Resource toBeDeduct) { if (Resources.greaterThan(resourceCalculator, cluster, getActuallyToBePreempted(), toBeDeduct)) { Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 7eab015bbaa..89452f9c0d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Map; /** * Temporary data-structure tracking resource availability, pending resource @@ -59,6 +61,10 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity { int relativePriority = 0; TempQueuePerPartition parent = null; + // This will hold a temp user data structure and will hold userlimit, + // idealAssigned, used etc. + Map usersPerPartition = new LinkedHashMap<>(); + TempQueuePerPartition(String queueName, Resource current, boolean preemptionDisabled, String partition, Resource killable, float absCapacity, float absMaxCapacity, Resource totalPartitionResource, @@ -289,4 +295,12 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity { return apps; } + public void addUserPerPartition(String userName, + TempUserPerPartition tmpUser) { + this.usersPerPartition.put(userName, tmpUser); + } + + public Map getUsersPerPartition() { + return usersPerPartition; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java new file mode 100644 index 00000000000..33ee18f1c22 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + + +/** + * Temporary data-structure tracking resource availability, pending resource + * need, current utilization for an application. + */ +public class TempUserPerPartition extends AbstractPreemptionEntity { + + private final User user; + private Resource userLimit; + private boolean donePreemptionQuotaForULDelta = false; + + TempUserPerPartition(User user, String queueName, Resource usedPerPartition, + Resource amUsedPerPartition, Resource reserved, + Resource pendingPerPartition) { + super(queueName, usedPerPartition, amUsedPerPartition, reserved, + pendingPerPartition); + this.user = user; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(" NAME: " + getUserName()).append(" CUR: ").append(getUsed()) + .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved) + .append(" AM_USED: ").append(amUsed).append(" USER_LIMIT: ") + .append(getUserLimit()).append(" IDEAL_ASSIGNED: ") + .append(idealAssigned).append(" USED_WO_AMUSED: ") + .append(getUsedDeductAM()).append(" IDEAL_PREEMPT: ") + .append(toBePreempted).append(" ACTUAL_PREEMPT: ") + .append(getActuallyToBePreempted()).append("\n"); + + return sb.toString(); + } + + public String getUserName() { + return user.getUserName(); + } + + public Resource getUserLimit() { + return userLimit; + } + + public void setUserLimit(Resource userLimitResource) { + this.userLimit = userLimitResource; + } + + public boolean isUserLimitReached(ResourceCalculator rc, + Resource clusterResource) { + if (Resources.greaterThan(rc, clusterResource, getUsedDeductAM(), + userLimit)) { + return true; + } + return false; + } + + public boolean isPreemptionQuotaForULDeltaDone() { + return this.donePreemptionQuotaForULDelta; + } + + public void updatePreemptionQuotaForULDeltaAsDone(boolean done) { + this.donePreemptionQuotaForULDelta = done; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 9fb92ec1991..c3c958519de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1233,6 +1233,14 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT = 0.2f; + /** + * For intra-queue preemption, enforce a preemption order such as + * "userlimit_first" or "priority_first". + */ + public static final String INTRAQUEUE_PREEMPTION_ORDER_POLICY = PREEMPTION_CONFIG_PREFIX + + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy"; + public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first"; + /** * Maximum application for a queue to be used when application per queue is * not defined.To be consistent with previous version the default value is set diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9059ef0936e..104e95e4eda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -43,12 +43,10 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; -import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -56,7 +54,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; @@ -2022,4 +2019,12 @@ public class LeafQueue extends AbstractCSQueue { writeLock.unlock(); } } + + /** + * Get all valid users in this queue. + * @return user list + */ + public Set getAllUsers() { + return this.getUsersManager().getUsers().keySet(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index c2134eb3309..579c4c7a1be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -253,6 +253,15 @@ public class UsersManager implements AbstractUsersManager { public void setUserResourceLimit(Resource userResourceLimit) { this.userResourceLimit = userResourceLimit; } + + public String getUserName() { + return this.userName; + } + + @VisibleForTesting + public void setResourceUsage(ResourceUsage resourceUsage) { + this.userResourceUsage = resourceUsage; + } } /* End of User class */ /** @@ -344,7 +353,7 @@ public class UsersManager implements AbstractUsersManager { /* * Get all users of queue. */ - private Map getUsers() { + public Map getUsers() { return users; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index a9e97fda500..4fc0ea490b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -96,6 +97,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { Clock mClock = null; CapacitySchedulerConfiguration conf = null; CapacityScheduler cs = null; + @SuppressWarnings("rawtypes") EventHandler mDisp = null; ProportionalCapacityPreemptionPolicy policy = null; Resource clusterResource = null; @@ -247,6 +249,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { if (containerId == 1) { when(rmc.isAMContainer()).thenReturn(true); when(app.getAMResource(label)).thenReturn(res); + when(app.getAppAMNodePartitionName()).thenReturn(label); } if (reserved) { @@ -280,6 +283,12 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { containerId++; } + // If app has 0 container, and it has only pending, still make sure to + // update label. + if (repeat == 0) { + when(app.getAppAMNodePartitionName()).thenReturn(label); + } + // Some more app specific aggregated data can be better filled here. when(app.getPriority()).thenReturn(pri); when(app.getUser()).thenReturn(userName); @@ -315,10 +324,15 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { private void mockApplications(String appsConfig) { int id = 1; HashMap> userMap = new HashMap>(); + HashMap>> userResourceUsagePerLabel = new HashMap<>(); LeafQueue queue = null; + int mulp = -1; for (String a : appsConfig.split(";")) { String[] strs = a.split("\t"); String queueName = strs[0]; + if (mulp <= 0 && strs.length > 2 && strs[2] != null) { + mulp = 100 / (new Integer(strs[2]).intValue()); + } // get containers List liveContainers = new ArrayList(); @@ -338,6 +352,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { when(app.getReservedContainers()).thenReturn(reservedContainers); when(app.getApplicationAttemptId()).thenReturn(appAttemptId); when(app.getApplicationId()).thenReturn(appId); + when(app.getQueueName()).thenReturn(queueName); // add to LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); @@ -349,23 +364,71 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { users = new HashSet(); userMap.put(queueName, users); } - users.add(app.getUser()); + + String label = app.getAppAMNodePartitionName(); + + // Get label to queue + HashMap> userResourceUsagePerQueue = userResourceUsagePerLabel + .get(label); + if (null == userResourceUsagePerQueue) { + userResourceUsagePerQueue = new HashMap<>(); + userResourceUsagePerLabel.put(label, userResourceUsagePerQueue); + } + + // Get queue to user based resource map + HashMap userResourceUsage = userResourceUsagePerQueue + .get(queueName); + if (null == userResourceUsage) { + userResourceUsage = new HashMap<>(); + userResourceUsagePerQueue.put(queueName, userResourceUsage); + } + + // Get user to its resource usage. + ResourceUsage usage = userResourceUsage.get(app.getUser()); + if (null == usage) { + usage = new ResourceUsage(); + userResourceUsage.put(app.getUser(), usage); + } + + usage.incAMUsed(app.getAMResource(label)); + usage.incUsed(app.getAppAttemptResourceUsage().getUsed(label)); id++; } - for (String queueName : userMap.keySet()) { - queue = (LeafQueue) nameToCSQueues.get(queueName); - // Currently we have user-limit test support only for default label. - Resource totResoucePerPartition = partitionToResource.get(""); - Resource capacity = Resources.multiply(totResoucePerPartition, - queue.getQueueCapacities().getAbsoluteCapacity()); - HashSet users = userMap.get(queue.getQueueName()); - Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size()); - for (String userName : users) { - when(queue.getResourceLimitForAllUsers(eq(userName), - any(Resource.class), anyString(), any(SchedulingMode.class))) - .thenReturn(userLimit); + for (String label : userResourceUsagePerLabel.keySet()) { + for (String queueName : userMap.keySet()) { + queue = (LeafQueue) nameToCSQueues.get(queueName); + // Currently we have user-limit test support only for default label. + Resource totResoucePerPartition = partitionToResource.get(""); + Resource capacity = Resources.multiply(totResoucePerPartition, + queue.getQueueCapacities().getAbsoluteCapacity()); + HashSet users = userMap.get(queue.getQueueName()); + when(queue.getAllUsers()).thenReturn(users); + Resource userLimit; + if (mulp > 0) { + userLimit = Resources.divideAndCeil(rc, capacity, mulp); + } else { + userLimit = Resources.divideAndCeil(rc, capacity, + users.size()); + } + LOG.debug("Updating user-limit from mock: totResoucePerPartition=" + + totResoucePerPartition + ", capacity=" + capacity + + ", users.size()=" + users.size() + ", userlimit= " + userLimit + + ",label= " + label + ",queueName= " + queueName); + + HashMap userResourceUsage = userResourceUsagePerLabel + .get(label).get(queueName); + for (String userName : users) { + User user = new User(userName); + if (userResourceUsage != null) { + user.setResourceUsage(userResourceUsage.get(userName)); + } + when(queue.getUser(eq(userName))).thenReturn(user); + when(queue.getResourceLimitForAllUsers(eq(userName), + any(Resource.class), anyString(), any(SchedulingMode.class))) + .thenReturn(userLimit); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java index bf83e1c7387..6c5aa670d44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java @@ -62,12 +62,16 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue * Apps which are running at low priority (4) will preempt few of its * resources to meet the demand. */ + + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "priority_first"); + String labelsConfig = "=100,true;"; String nodesConfig = // n1 has no label "n1= res=100"; String queuesConfig = // guaranteed,max,used,pending,reserved - "root(=[100 100 80 120 0]);" + // root + "root(=[100 100 79 120 0]);" + // root "-a(=[11 100 11 50 0]);" + // a "-b(=[40 100 38 60 0]);" + // b "-c(=[20 100 10 10 0]);" + // c @@ -304,6 +308,8 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue conf.setFloat(CapacitySchedulerConfiguration. INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, (float) 0.5); + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "priority_first"); String labelsConfig = "=100,true;"; String nodesConfig = // n1 has no label @@ -357,6 +363,8 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue // report "ideal" preempt as 10%. Ensure preemption happens only for 10% conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "priority_first"); String labelsConfig = "=100,true;"; String nodesConfig = // n1 has no label @@ -411,6 +419,8 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue conf.setFloat(CapacitySchedulerConfiguration. INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, (float) 0.5); + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "priority_first"); String labelsConfig = "=100,true;"; String nodesConfig = // n1 has no label @@ -418,7 +428,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue String queuesConfig = // guaranteed,max,used,pending,reserved "root(=[100 100 95 170 0]);" + // root - "-a(=[60 100 70 50 0]);" + // a + "-a(=[60 100 70 35 0]);" + // a "-b(=[40 100 25 120 0])"; // b String appsConfig = @@ -467,6 +477,8 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue conf.setFloat(CapacitySchedulerConfiguration. INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, (float) 0.5); + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "priority_first"); String labelsConfig = "=100,true;"; String nodesConfig = // n1 has no label @@ -516,6 +528,8 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue * cycle. Eventhough there are more demand and no other low priority * apps are present, still AM contaier need to soared. */ + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "priority_first"); String labelsConfig = "=100,true;"; String nodesConfig = // n1 has no label @@ -660,6 +674,8 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue conf.setFloat(CapacitySchedulerConfiguration. INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, (float) 0.5); + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "priority_first"); String labelsConfig = "=100,true;" + // default partition "x=100,true"; // partition=x @@ -720,6 +736,8 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue conf.setFloat(CapacitySchedulerConfiguration. INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, (float) 0.5); + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "priority_first"); String labelsConfig = "=100,true;"; String nodesConfig = // n1 has no label @@ -840,8 +858,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue policy.editSchedule(); // Considering user-limit of 50% since only 2 users are there, only preempt - // 15 more (5 is already running) eventhough demand is for 30. - verify(mDisp, times(15)).handle(argThat( + // 14 more (5 is already running) eventhough demand is for 30. Ideally we + // must preempt 15. But 15th container will bring user1's usage to 20 which + // is same as user-limit. Hence skip 15th container. + verify(mDisp, times(14)).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(3)))); } @@ -869,6 +889,8 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue conf.setFloat(CapacitySchedulerConfiguration. INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, (float) 0.5); + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "priority_first"); String labelsConfig = "=100,true;" + // default partition "x=100,true"; // partition=x diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java new file mode 100644 index 00000000000..7df52f9e4a0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java @@ -0,0 +1,899 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Test class for IntraQueuePreemption scenarios. + */ +public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit + extends + ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + conf.setBoolean( + CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + @Test + public void testSimpleIntraQueuePreemptionWithTwoUsers() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Preconditions: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+----------+------+---------+ + * | APP | USER | PRIORITY | USED | PENDING | + * +--------------+----------+------+---------+ + * | app1 | user1 | 1 | 100 | 0 | + * | app2 | user2 | 1 | 0 | 30 | + * +--------------+----------+------+---------+ + * Hence in queueA of 100, each user has a quota of 50. app1 of high priority + * has a demand of 0 and its already using 100. app2 from user2 has a demand + * of 30, and UL is 50. 30 would be preempted from app1. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 30 0]);" + // root + "-a(=[100 100 100 30 0])"; // a + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,100,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,0,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource and its well under its user-limit. Hence preempt + // resources from app1. + verify(mDisp, times(30)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testNoIntraQueuePreemptionWithSingleUser() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+----------+------+---------+ + * | APP | USER | PRIORITY | USED | PENDING | + * +--------------+----------+------+---------+ + * | app1 | user1 | 1 | 100 | 0 | + * | app2 | user1 | 1 | 0 | 30 | + * +--------------+----------+------+---------+ + * Given single user, lower priority/late submitted apps has to + * wait. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 30 0]);" + // root + "-a(=[100 100 100 30 0])"; // a + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,100,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,0,false,30,user1)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource. Since app1,2 are from same user, there wont be + // any preemption. + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testNoIntraQueuePreemptionWithTwoUserUnderUserLimit() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+----------+------+---------+ + * | APP | USER | PRIORITY | USED | PENDING | + * +--------------+----------+------+---------+ + * | app1 | user1 | 1 | 50 | 0 | + * | app2 | user2 | 1 | 30 | 30 | + * +--------------+----------+------+---------+ + * Hence in queueA of 100, each user has a quota of 50. app1 of high priority + * has a demand of 0 and its already using 50. app2 from user2 has a demand + * of 30, and UL is 50. Since app1 is under UL, there should not be any + * preemption. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 80 30 0]);" + // root + "-a(=[100 100 80 30 0])"; // a + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,50,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,30,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource. Since app1,2 are from same user, there wont be + // any preemption. + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testSimpleIntraQueuePreemptionWithTwoUsersWithAppPriority() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+----------+------+---------+ + * | APP | USER | PRIORITY | USED | PENDING | + * +--------------+----------+------+---------+ + * | app1 | user1 | 2 | 100 | 0 | + * | app2 | user2 | 1 | 0 | 30 | + * +--------------+----------+------+---------+ + * Hence in queueA of 100, each user has a quota of 50. app1 of high priority + * has a demand of 0 and its already using 100. app2 from user2 has a demand + * of 30, and UL is 50. 30 would be preempted from app1. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 30 0]);" + // root + "-a(=[100 100 100 30 0])"; // a + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(2,1,n1,,100,false,0,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,0,false,30,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource and its well under its user-limit. Hence preempt + // resources from app1 even though its priority is more than app2. + verify(mDisp, times(30)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testIntraQueuePreemptionOfUserLimitWithMultipleApps() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+----------+------+---------+ + * | APP | USER | PRIORITY | USED | PENDING | + * +--------------+----------+------+---------+ + * | app1 | user1 | 1 | 30 | 30 | + * | app2 | user2 | 1 | 20 | 20 | + * | app3 | user1 | 1 | 30 | 30 | + * | app4 | user2 | 1 | 0 | 10 | + * +--------------+----------+------+---------+ + * Hence in queueA of 100, each user has a quota of 50. Now have multiple + * apps and check for preemption across apps. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 80 90 0]);" + // root + "-a(=[100 100 80 90 0])"; // a + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,30,false,30,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,20,false,20,user2);" + + "a\t" // app3 in a + + "(1,1,n1,,30,false,30,user1);" + + "a\t" // app4 in a + + "(1,1,n1,,0,false,10,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2/app4 needs more resource and its well under its user-limit. Hence + // preempt resources from app3 (compare to app1, app3 has low priority). + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testNoPreemptionOfUserLimitWithMultipleAppsAndSameUser() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+----------+------+---------+ + * | APP | USER | PRIORITY | USED | PENDING | + * +--------------+----------+------+---------+ + * | app1 | user1 | 1 | 30 | 30 | + * | app2 | user1 | 1 | 20 | 20 | + * | app3 | user1 | 1 | 30 | 30 | + * | app4 | user1 | 1 | 0 | 10 | + * +--------------+----------+------+---------+ + * Hence in queueA of 100, each user has a quota of 50. Now have multiple + * apps and check for preemption across apps. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 80 90 0]);" + // root + "-a(=[100 100 80 90 0])"; // a + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,30,false,20,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,20,false,20,user1);" + + "a\t" // app3 in a + + "(1,1,n1,,30,false,30,user1);" + + "a\t" // app4 in a + + "(1,1,n1,,0,false,10,user1)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2/app4 needs more resource and its well under its user-limit. Hence + // preempt resources from app3 (compare to app1, app3 has low priority). + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + + @Test + public void testIntraQueuePreemptionOfUserLimitWitAppsOfDifferentPriority() + throws IOException { + /** + * Queue structure is: + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+----------+------+---------+ + * | APP | USER | PRIORITY | USED | PENDING | + * +--------------+----------+------+---------+ + * | app1 | user1 | 3 | 30 | 30 | + * | app2 | user2 | 1 | 20 | 20 | + * | app3 | user1 | 4 | 30 | 0 | + * | app4 | user2 | 1 | 0 | 10 | + * +--------------+----------+------+---------+ + * Hence in queueA of 100, each user has a quota of 50. Now have multiple + * apps and check for preemption across apps. + */ + + // Set max preemption limit as 50%. + conf.setFloat( + CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 80 60 0]);" + // root + "-a(=[100 100 80 60 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(3,1,n1,,30,false,30,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,20,false,20,user2);" + "a\t" // app3 in a + + "(4,1,n1,,30,false,0,user1);" + "a\t" // app4 in a + + "(1,1,n1,,0,false,10,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2/app4 needs more resource and its well under its user-limit. Hence + // preempt resources from app1 (compare to app3, app1 has low priority). + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + @Test + public void testIntraQueuePreemptionOfUserLimitInTwoQueues() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *      /   \
+     *     a     b
+     * 
+ * + * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 + * maxIntraQueuePreemptableLimit by default is 50%. This test is to verify + * that intra-queue preemption could occur in two queues when user-limit + * irreuglarity is present in queue. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 90 80 0]);" + // root + "-a(=[60 100 55 60 0]);" + // a + "-b(=[40 100 35 20 0])"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(3,1,n1,,20,false,30,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,20,false,20,user2);" + + "a\t" // app3 in a + + "(4,1,n1,,15,false,0,user1);" + + "a\t" // app4 in a + + "(1,1,n1,,0,false,10,user2);" + + "b\t" // app5 in b + + "(3,1,n1,,25,false,10,user1);" + + "b\t" // app6 in b + + "(1,1,n1,,10,false,10,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2/app4 needs more resource and its well under its user-limit. Hence + // preempt resources from app1 (compare to app3, app1 has low priority). + verify(mDisp, times(4)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(4)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(5)))); + } + + @Test + public void testIntraQueuePreemptionWithTwoRequestingUsers() + throws IOException { + /** + * Queue structure is: + * + *
+    *       root
+    *        |
+    *        a
+    * 
+ * + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+----------+------+---------+ + * | APP | USER | PRIORITY | USED | PENDING | + * +--------------+----------+------+---------+ + * | app1 | user1 | 1 | 60 | 10 | + * | app2 | user2 | 1 | 40 | 10 | + * +--------------+----------+------+---------+ + * Hence in queueA of 100, each user has a quota of 50. Now have multiple + * apps and check for preemption across apps. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 20 0]);" + // root + "-a(=[100 100 100 20 0])"; // a + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,1,n1,,60,false,10,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,40,false,10,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource and its well under its user-limit. Hence preempt + // resources from app1. + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testNoIntraQueuePreemptionIfBelowUserLimitAndLowPriorityExtraUsers() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Preconditions: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+----------+------+---------+ + * | APP | USER | PRIORITY | USED | PENDING | + * +--------------+----------+------+---------+ + * | app1 | user1 | 1 | 50 | 0 | + * | app2 | user2 | 1 | 50 | 0 | + * | app3 | user3 | 0 | 0 | 10 | + * +--------------+----------+------+---------+ + * This scenario should never preempt from either user1 or user2 + */ + + // Set max preemption per round to 50% (this is different from minimum user + // limit percent). + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.7); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 10 0]);" + // root + "-a(=[100 100 100 10 0])"; // a + + String appsConfig = + // queueName\t\ + // (priority,resource,host,label,#repeat,reserved,pending,user)\tMULP; + "a\t(1,1,n1,,50,false,0,user1)\t50;" + // app1, user1 + "a\t(1,1,n1,,50,false,0,user2)\t50;" + // app2, user2 + "a\t(0,1,n1,,0,false,10,user3)\t50"; // app3, user3 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2/app4 needs more resource and its well under its user-limit. Hence + // preempt resources from app1 (compare to app3, app1 has low priority). + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testNoIntraQueuePreemptionIfBelowUserLimitAndSamePriorityExtraUsers() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Preconditions: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+----------+------+---------+ + * | APP | USER | PRIORITY | USED | PENDING | + * +--------------+----------+------+---------+ + * | app1 | user1 | 1 | 50 | 0 | + * | app2 | user2 | 1 | 50 | 0 | + * | app3 | user3 | 1 | 0 | 10 | + * +--------------+----------+------+---------+ + * This scenario should never preempt from either user1 or user2 + */ + + // Set max preemption per round to 50% (this is different from minimum user + // limit percent). + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.7); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 10 0]);" + // root + "-a(=[100 100 100 10 0])"; // a + + String appsConfig = + // queueName\t\ + // (priority,resource,host,label,#repeat,reserved,pending,user)\tMULP; + "a\t(1,1,n1,,50,false,0,user1)\t50;" + // app1, user1 + "a\t(1,1,n1,,50,false,0,user2)\t50;" + // app2, user2 + "a\t(1,1,n1,,0,false,10,user3)\t50"; // app3, user3 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2/app4 needs more resource and its well under its user-limit. Hence + // preempt resources from app1 (compare to app3, app1 has low priority). + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testNoIntraQueuePreemptionIfBelowUserLimitAndHighPriorityExtraUsers() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *        |
+     *        a
+     * 
+ * + * Scenario: + * Preconditions: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+----------+------+---------+ + * | APP | USER | PRIORITY | USED | PENDING | + * +--------------+----------+------+---------+ + * | app1 | user1 | 1 | 50 | 0 | + * | app2 | user2 | 1 | 50 | 0 | + * | app3 | user3 | 5 | 0 | 10 | + * +--------------+----------+------+---------+ + * This scenario should never preempt from either user1 or user2 + */ + + // Set max preemption per round to 50% (this is different from minimum user + // limit percent). + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.7); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 10 0]);" + // root + "-a(=[100 100 100 10 0])"; // a + + String appsConfig = + // queueName\t\ + // (priority,resource,host,label,#repeat,reserved,pending,user)\tMULP; + "a\t(1,1,n1,,50,false,0,user1)\t50;" + // app1, user1 + "a\t(1,1,n1,,50,false,0,user2)\t50;" + // app2, user2 + "a\t(5,1,n1,,0,false,10,user3)\t50"; // app3, user3 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2/app4 needs more resource and its well under its user-limit. Hence + // preempt resources from app1 (compare to app3, app1 has low priority). + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testNoIntraQueuePreemptionWithUserLimitDeadzone() + throws IOException { + /** + * Queue structure is: + * + *
+    *       root
+    *        |
+    *        a
+    * 
+ * + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+----------+------+---------+ + * | APP | USER | PRIORITY | USED | PENDING | + * +--------------+----------+------+---------+ + * | app1 | user1 | 1 | 60 | 10 | + * | app2 | user2 | 1 | 40 | 10 | + * +--------------+----------+------+---------+ + * Hence in queueA of 100, each user has a quota of 50. Now have multiple + * apps and check for preemption across apps but also ensure that user's + * usage not coming under its user-limit. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 20 0]);" + // root + "-a(=[100 100 100 20 0])"; // a + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,3,n1,,20,false,10,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,40,false,10,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource and its well under its user-limit. Hence preempt + // 3 resources (9GB) from app1. We will not preempt last container as it may + // pull user's usage under its user-limit. + verify(mDisp, times(3)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testIntraQueuePreemptionWithUserLimitDeadzoneAndPriority() + throws IOException { + /** + * Queue structure is: + * + *
+    *       root
+    *        |
+    *        a
+    * 
+ * + * Scenario: + * Queue total resources: 100 + * Minimum user limit percent: 50% + * +--------------+----------+------+---------+ + * | APP | USER | PRIORITY | USED | PENDING | + * +--------------+----------+------+---------+ + * | app1 | user1 | 1 | 60 | 10 | + * | app2 | user2 | 1 | 40 | 10 | + * +--------------+----------+------+---------+ + * Hence in queueA of 100, each user has a quota of 50. Now have multiple + * apps and check for preemption across apps but also ensure that user's + * usage not coming under its user-limit. + */ + + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 20 0]);" + // root + "-a(=[100 100 100 20 0])"; // a + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,3,n1,,20,false,10,user1);" + // app1 a + "a\t" // app2 in a + + "(2,1,n1,,0,false,10,user1);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,40,false,20,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 needs more resource and its well under its user-limit. Hence preempt + // 3 resources (9GB) from app1. We will not preempt last container as it may + // pull user's usage under its user-limit. + verify(mDisp, times(3)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + + // After first round, 3 containers were preempted from app1 and resource + // distribution will be like below. + appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + "a\t" // app1 in a + + "(1,3,n1,,17,false,10,user1);" + // app1 a + "a\t" // app2 in a + + "(2,1,n1,,0,false,10,user1);" + // app2 a + "a\t" // app2 in a + + "(1,1,n1,,49,false,11,user2)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 has priority demand within same user 'user1'. However user1's used + // is alredy under UL. Hence no preemption. We will still get 3 container + // while asserting as it was aleady selected in earlier round. + verify(mDisp, times(3)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java new file mode 100644 index 00000000000..7784549a34a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test class for IntraQueuePreemption scenarios. + */ +public class TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF + extends + ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + conf.setBoolean( + CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true); + rc = new DominantResourceCalculator(); + when(cs.getResourceCalculator()).thenReturn(rc); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + @Test + public void testSimpleIntraQueuePreemptionWithVCoreResource() + throws IOException { + /** + * The simplest test preemption, Queue structure is: + * + *
+     *       root
+     *     /  | | \
+     *    a  b  c  d
+     * 
+ * + * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource = + * 100 Scenario: Queue B has few running apps and two high priority apps + * have demand. Apps which are running at low priority (4) will preempt few + * of its resources to meet the demand. + */ + + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "priority_first"); + + String labelsConfig = "=100:200,true;"; + String nodesConfig = // n1 has no label + "n1= res=100:200"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100:50 100:50 80:40 120:60 0]);" + // root + "-a(=[10:5 100:50 10:5 50:25 0]);" + // a + "-b(=[40:20 100:50 40:20 60:30 0]);" + // b + "-c(=[20:10 100:50 10:5 10:5 0]);" + // c + "-d(=[30:15 100:50 20:10 0 0])"; // d + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved, + // pending) + "a\t" // app1 in a + + "(1,1:1,n1,,5,false,25:25);" + // app1 a + "a\t" // app2 in a + + "(1,1:1,n1,,5,false,25:25);" + // app2 a + "b\t" // app3 in b + + "(4,1:1,n1,,36,false,20:20);" + // app3 b + "b\t" // app4 in b + + "(4,1:1,n1,,2,false,10:10);" + // app4 b + "b\t" // app4 in b + + "(5,1:1,n1,,1,false,10:10);" + // app5 b + "b\t" // app4 in b + + "(6,1:1,n1,,1,false,10:10);" + // app6 in b + "c\t" // app1 in a + + "(1,1:1,n1,,10,false,10:10);" + "d\t" // app7 in c + + "(1,1:1,n1,,20,false,0)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // For queue B, app3 and app4 were of lower priority. Hence take 8 + // containers from them by hitting the intraQueuePreemptionDemand of 20%. + verify(mDisp, times(1)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + verify(mDisp, times(7)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testIntraQueuePreemptionWithDominantVCoreResource() + throws IOException { + /** + * The simplest test preemption, Queue structure is: + * + *
+     *     root
+     *     /  \
+     *    a    b
+     * 
+ * + * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 + * Scenario: Queue B has few running apps and two high priority apps have + * demand. Apps which are running at low priority (4) will preempt few of + * its resources to meet the demand. + */ + + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "priority_first"); + // Set max preemption limit as 50%. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100:200,true;"; + String nodesConfig = // n1 has no label + "n1= res=100:200"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100:50 100:50 50:40 110:60 0]);" + // root + "-a(=[40:20 100:50 9:9 50:30 0]);" + // a + "-b(=[60:30 100:50 40:30 60:30 0]);"; // b + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved, + // pending) + "a\t" // app1 in a + + "(1,2:1,n1,,4,false,25:25);" + // app1 a + "a\t" // app2 in a + + "(1,1:3,n1,,2,false,25:25);" + // app2 a + "b\t" // app3 in b + + "(4,2:1,n1,,10,false,20:20);" + // app3 b + "b\t" // app4 in b + + "(4,1:2,n1,,5,false,10:10);" + // app4 b + "b\t" // app5 in b + + "(5,1:1,n1,,5,false,30:20);" + // app5 b + "b\t" // app6 in b + + "(6,2:1,n1,,5,false,30:20);"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // For queue B, app3 and app4 were of lower priority. Hence take 4 + // containers. + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, times(4)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + verify(mDisp, times(4)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(5)))); + } +}