From 514794e1a5a39ca61de3981d53a05547ae17f5e4 Mon Sep 17 00:00:00 2001 From: Carlo Curino Date: Thu, 22 Feb 2018 18:12:12 -0800 Subject: [PATCH] YARN-7934. [GQ] Refactor preemption calculators to allow overriding for Federation Global Algos. (Contributed by curino) --- ...AbstractPreemptableResourceCalculator.java | 38 ++++-- .../capacity/AbstractPreemptionEntity.java | 4 + .../CapacitySchedulerPreemptionContext.java | 6 +- .../PreemptableResourceCalculator.java | 21 ++-- .../capacity/TempQueuePerPartition.java | 108 ++++++++++++++---- .../webapp/dao/ResourceInfo.java | 5 +- 6 files changed, 140 insertions(+), 42 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java index 5196831e2ad..258997083db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -18,6 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.PriorityQueue; + import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; @@ -26,12 +32,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; -import java.util.PriorityQueue; - /** * Calculate how much resources need to be preempted for each queue, * will be used by {@link PreemptionCandidatesSelector}. @@ -126,11 +126,18 @@ protected void computeFixpointAllocation(Resource totGuarant, TempQueuePerPartition q = i.next(); Resource used = q.getUsed(); + Resource initIdealAssigned; if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) { - q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra); + initIdealAssigned = + Resources.add(q.getGuaranteed(), q.untouchableExtra); } else { - q.idealAssigned = Resources.clone(used); + initIdealAssigned = Resources.clone(used); } + + // perform initial assignment + initIdealAssignment(totGuarant, q, initIdealAssigned); + + Resources.subtractFrom(unassigned, q.idealAssigned); // If idealAssigned < (allocated + used + pending), q needs more // resources, so @@ -188,6 +195,21 @@ protected void computeFixpointAllocation(Resource totGuarant, } } + + /** + * This method is visible to allow sub-classes to override the initialization + * behavior. + * + * @param totGuarant total resources (useful for {@code ResourceCalculator} + * operations) + * @param q the {@code TempQueuePerPartition} being initialized + * @param initIdealAssigned the proposed initialization value. + */ + protected void initIdealAssignment(Resource totGuarant, + TempQueuePerPartition q, Resource initIdealAssigned) { + q.idealAssigned = initIdealAssigned; + } + /** * Computes a normalizedGuaranteed capacity based on active queues. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java index dbd1f0a6da0..cb4d7af769d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java @@ -59,6 +59,10 @@ public class AbstractPreemptionEntity { this.selected = Resource.newInstance(0, 0); } + public String getQueueName() { + return queueName; + } + public Resource getUsed() { return current; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java index d6f3f6c6d42..098acdd8510 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java @@ -30,7 +30,11 @@ import java.util.LinkedHashSet; import java.util.Set; -interface CapacitySchedulerPreemptionContext { +/** + * This interface provides context for the calculation of ideal allocation + * and preemption for the {@code CapacityScheduler}. + */ +public interface CapacitySchedulerPreemptionContext { CapacityScheduler getScheduler(); TempQueuePerPartition getQueueByPartition(String queueName, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java index 907785e437d..2d2cdf6d197 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Resource; @@ -26,11 +31,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - /** * Calculate how much resources need to be preempted for each queue, * will be used by {@link PreemptionCandidatesSelector} @@ -70,7 +70,7 @@ public PreemptableResourceCalculator( * @param totalPreemptionAllowed total amount of preemption we allow * @param tot_guarant the amount of capacity assigned to this pool of queues */ - private void computeIdealResourceDistribution(ResourceCalculator rc, + protected void computeIdealResourceDistribution(ResourceCalculator rc, List queues, Resource totalPreemptionAllowed, Resource tot_guarant) { @@ -138,14 +138,13 @@ private void computeIdealResourceDistribution(ResourceCalculator rc, /** * This method recursively computes the ideal assignment of resources to each * level of the hierarchy. This ensures that leafs that are over-capacity but - * with parents within capacity will not be preemptionCandidates. Preemptions are allowed - * within each subtree according to local over/under capacity. + * with parents within capacity will not be preemptionCandidates. Preemptions + * are allowed within each subtree according to local over/under capacity. * * @param root the root of the cloned queue hierachy * @param totalPreemptionAllowed maximum amount of preemption allowed - * @return a list of leaf queues updated with preemption targets */ - private void recursivelyComputeIdealAssignment( + protected void recursivelyComputeIdealAssignment( TempQueuePerPartition root, Resource totalPreemptionAllowed) { if (root.getChildren() != null && root.getChildren().size() > 0) { @@ -242,7 +241,7 @@ public void computeIdealAllocation(Resource clusterResource, // compute the ideal distribution of resources among queues // updates cloned queues state accordingly - tRoot.idealAssigned = tRoot.getGuaranteed(); + tRoot.initializeRootIdealWithGuarangeed(); recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index fdeee5273f1..9d8297d5590 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -18,22 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity - .ParentQueue; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceUtils; -import org.apache.hadoop.yarn.util.resource.Resources; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; + /** * Temporary data-structure tracking resource availability, pending resource * need, current utilization. This is per-queue-per-partition data structure @@ -74,7 +72,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity { // idealAssigned, used etc. Map usersPerPartition = new LinkedHashMap<>(); - TempQueuePerPartition(String queueName, Resource current, + @SuppressWarnings("checkstyle:parameternumber") + public TempQueuePerPartition(String queueName, Resource current, boolean preemptionDisabled, String partition, Resource killable, float absCapacity, float absMaxCapacity, Resource totalPartitionResource, Resource reserved, CSQueue queue, Resource effMinRes, @@ -94,7 +93,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity { pendingDeductReserved = Resources.createResource(0); } - if (ParentQueue.class.isAssignableFrom(queue.getClass())) { + if (queue != null && ParentQueue.class.isAssignableFrom(queue.getClass())) { parentQueue = (ParentQueue) queue; } @@ -179,15 +178,14 @@ Resource offer(Resource avail, ResourceCalculator rc, // Because for a satisfied parent queue, it could have some under-utilized // leaf queues. Such under-utilized leaf queue could preemption resources // from over-utilized leaf queue located at other hierarchies. - if (null == children || children.isEmpty()) { - Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract( - Resources.max(rc, clusterResource, getUsed(), getGuaranteed()), - idealAssigned); - maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource, - maxOfGuranteedAndUsedDeductAssigned, Resources.none()); - accepted = Resources.min(rc, clusterResource, accepted, - maxOfGuranteedAndUsedDeductAssigned); - } + + accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted); + + // accepted so far contains the "quota acceptable" amount, we now filter by + // locality acceptable + + accepted = acceptedByLocality(rc, accepted); + Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain; @@ -329,4 +327,72 @@ public void addUserPerPartition(String userName, public Map getUsersPerPartition() { return usersPerPartition; } + + public void setPending(Resource pending) { + this.pending = pending; + } + + public Resource getIdealAssigned() { + return idealAssigned; + } + + public String toGlobalString() { + StringBuilder sb = new StringBuilder(); + sb.append("\n").append(toString()); + for (TempQueuePerPartition c : children) { + sb.append(c.toGlobalString()); + } + return sb.toString(); + } + + /** + * This method is visible to allow sub-classes to override the behavior, + * specifically to take into account locality-based limitations of how much + * the queue can consumed. + * + * @param rc the ResourceCalculator to be used. + * @param offered the input amount of Resource offered to this queue. + * + * @return the subset of Resource(s) that the queue can consumed after + * accounting for locality effects. + */ + protected Resource acceptedByLocality(ResourceCalculator rc, + Resource offered) { + return offered; + } + + /** + * This method is visible to allow sub-classes to override the behavior, + * specifically for federation purposes we do not want to cap resources as it + * is done here. + * + * @param rc the {@code ResourceCalculator} to be used + * @param clusterResource the total cluster resources + * @param offered the resources offered to this queue + * @return the amount of resources accepted after considering max and + * deducting assigned. + */ + protected Resource filterByMaxDeductAssigned(ResourceCalculator rc, + Resource clusterResource, Resource offered) { + if (null == children || children.isEmpty()) { + Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract( + Resources.max(rc, clusterResource, getUsed(), getGuaranteed()), + idealAssigned); + maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource, + maxOfGuranteedAndUsedDeductAssigned, Resources.none()); + offered = Resources.min(rc, clusterResource, offered, + maxOfGuranteedAndUsedDeductAssigned); + } + return offered; + } + + /** + * This method is visible to allow sub-classes to ovverride the behavior, + * specifically for federation purposes we need to initialize per-sub-cluster + * roots as well as the global one. + */ + protected void initializeRootIdealWithGuarangeed() { + idealAssigned = Resources.clone(getGuaranteed()); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java index 5bed936d402..9a335e90721 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java @@ -70,7 +70,7 @@ public int getvCores() { @Override public String toString() { - return resources.toString(); + return getResource().toString(); } public void setMemory(int memory) { @@ -90,6 +90,9 @@ public void setvCores(int vCores) { } public Resource getResource() { + if (resources == null) { + resources = Resource.newInstance(memory, vCores); + } return Resource.newInstance(resources); }