YARN-7934. [GQ] Refactor preemption calculators to allow overriding for Federation Global Algos. (Contributed by curino)

This commit is contained in:
Carlo Curino 2018-02-22 18:12:12 -08:00
parent 95904f6b3c
commit 514794e1a5
6 changed files with 140 additions and 42 deletions

View File

@ -18,6 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
@ -26,12 +32,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
/** /**
* Calculate how much resources need to be preempted for each queue, * Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector}. * will be used by {@link PreemptionCandidatesSelector}.
@ -126,11 +126,18 @@ public class AbstractPreemptableResourceCalculator {
TempQueuePerPartition q = i.next(); TempQueuePerPartition q = i.next();
Resource used = q.getUsed(); Resource used = q.getUsed();
Resource initIdealAssigned;
if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) { if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra); initIdealAssigned =
Resources.add(q.getGuaranteed(), q.untouchableExtra);
} else { } else {
q.idealAssigned = Resources.clone(used); initIdealAssigned = Resources.clone(used);
} }
// perform initial assignment
initIdealAssignment(totGuarant, q, initIdealAssigned);
Resources.subtractFrom(unassigned, q.idealAssigned); Resources.subtractFrom(unassigned, q.idealAssigned);
// If idealAssigned < (allocated + used + pending), q needs more // If idealAssigned < (allocated + used + pending), q needs more
// resources, so // resources, so
@ -188,6 +195,21 @@ public class AbstractPreemptableResourceCalculator {
} }
} }
/**
* This method is visible to allow sub-classes to override the initialization
* behavior.
*
* @param totGuarant total resources (useful for {@code ResourceCalculator}
* operations)
* @param q the {@code TempQueuePerPartition} being initialized
* @param initIdealAssigned the proposed initialization value.
*/
protected void initIdealAssignment(Resource totGuarant,
TempQueuePerPartition q, Resource initIdealAssigned) {
q.idealAssigned = initIdealAssigned;
}
/** /**
* Computes a normalizedGuaranteed capacity based on active queues. * Computes a normalizedGuaranteed capacity based on active queues.
* *

View File

@ -59,6 +59,10 @@ public class AbstractPreemptionEntity {
this.selected = Resource.newInstance(0, 0); this.selected = Resource.newInstance(0, 0);
} }
public String getQueueName() {
return queueName;
}
public Resource getUsed() { public Resource getUsed() {
return current; return current;
} }

View File

@ -30,7 +30,11 @@ import java.util.Collection;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.Set; import java.util.Set;
interface CapacitySchedulerPreemptionContext { /**
* This interface provides context for the calculation of ideal allocation
* and preemption for the {@code CapacityScheduler}.
*/
public interface CapacitySchedulerPreemptionContext {
CapacityScheduler getScheduler(); CapacityScheduler getScheduler();
TempQueuePerPartition getQueueByPartition(String queueName, TempQueuePerPartition getQueueByPartition(String queueName,

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -26,11 +31,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/** /**
* Calculate how much resources need to be preempted for each queue, * Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector} * will be used by {@link PreemptionCandidatesSelector}
@ -70,7 +70,7 @@ public class PreemptableResourceCalculator
* @param totalPreemptionAllowed total amount of preemption we allow * @param totalPreemptionAllowed total amount of preemption we allow
* @param tot_guarant the amount of capacity assigned to this pool of queues * @param tot_guarant the amount of capacity assigned to this pool of queues
*/ */
private void computeIdealResourceDistribution(ResourceCalculator rc, protected void computeIdealResourceDistribution(ResourceCalculator rc,
List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed, List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed,
Resource tot_guarant) { Resource tot_guarant) {
@ -138,14 +138,13 @@ public class PreemptableResourceCalculator
/** /**
* This method recursively computes the ideal assignment of resources to each * This method recursively computes the ideal assignment of resources to each
* level of the hierarchy. This ensures that leafs that are over-capacity but * level of the hierarchy. This ensures that leafs that are over-capacity but
* with parents within capacity will not be preemptionCandidates. Preemptions are allowed * with parents within capacity will not be preemptionCandidates. Preemptions
* within each subtree according to local over/under capacity. * are allowed within each subtree according to local over/under capacity.
* *
* @param root the root of the cloned queue hierachy * @param root the root of the cloned queue hierachy
* @param totalPreemptionAllowed maximum amount of preemption allowed * @param totalPreemptionAllowed maximum amount of preemption allowed
* @return a list of leaf queues updated with preemption targets
*/ */
private void recursivelyComputeIdealAssignment( protected void recursivelyComputeIdealAssignment(
TempQueuePerPartition root, Resource totalPreemptionAllowed) { TempQueuePerPartition root, Resource totalPreemptionAllowed) {
if (root.getChildren() != null && if (root.getChildren() != null &&
root.getChildren().size() > 0) { root.getChildren().size() > 0) {
@ -242,7 +241,7 @@ public class PreemptableResourceCalculator
// compute the ideal distribution of resources among queues // compute the ideal distribution of resources among queues
// updates cloned queues state accordingly // updates cloned queues state accordingly
tRoot.idealAssigned = tRoot.getGuaranteed(); tRoot.initializeRootIdealWithGuarangeed();
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
} }

View File

@ -18,22 +18,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.ParentQueue;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
/** /**
* Temporary data-structure tracking resource availability, pending resource * Temporary data-structure tracking resource availability, pending resource
* need, current utilization. This is per-queue-per-partition data structure * need, current utilization. This is per-queue-per-partition data structure
@ -74,7 +72,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
// idealAssigned, used etc. // idealAssigned, used etc.
Map<String, TempUserPerPartition> usersPerPartition = new LinkedHashMap<>(); Map<String, TempUserPerPartition> usersPerPartition = new LinkedHashMap<>();
TempQueuePerPartition(String queueName, Resource current, @SuppressWarnings("checkstyle:parameternumber")
public TempQueuePerPartition(String queueName, Resource current,
boolean preemptionDisabled, String partition, Resource killable, boolean preemptionDisabled, String partition, Resource killable,
float absCapacity, float absMaxCapacity, Resource totalPartitionResource, float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
Resource reserved, CSQueue queue, Resource effMinRes, Resource reserved, CSQueue queue, Resource effMinRes,
@ -94,7 +93,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
pendingDeductReserved = Resources.createResource(0); pendingDeductReserved = Resources.createResource(0);
} }
if (ParentQueue.class.isAssignableFrom(queue.getClass())) { if (queue != null && ParentQueue.class.isAssignableFrom(queue.getClass())) {
parentQueue = (ParentQueue) queue; parentQueue = (ParentQueue) queue;
} }
@ -179,15 +178,14 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
// Because for a satisfied parent queue, it could have some under-utilized // Because for a satisfied parent queue, it could have some under-utilized
// leaf queues. Such under-utilized leaf queue could preemption resources // leaf queues. Such under-utilized leaf queue could preemption resources
// from over-utilized leaf queue located at other hierarchies. // from over-utilized leaf queue located at other hierarchies.
if (null == children || children.isEmpty()) {
Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract( accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted);
Resources.max(rc, clusterResource, getUsed(), getGuaranteed()),
idealAssigned); // accepted so far contains the "quota acceptable" amount, we now filter by
maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource, // locality acceptable
maxOfGuranteedAndUsedDeductAssigned, Resources.none());
accepted = Resources.min(rc, clusterResource, accepted, accepted = acceptedByLocality(rc, accepted);
maxOfGuranteedAndUsedDeductAssigned);
}
Resource remain = Resources.subtract(avail, accepted); Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted); Resources.addTo(idealAssigned, accepted);
return remain; return remain;
@ -329,4 +327,72 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
public Map<String, TempUserPerPartition> getUsersPerPartition() { public Map<String, TempUserPerPartition> getUsersPerPartition() {
return usersPerPartition; return usersPerPartition;
} }
public void setPending(Resource pending) {
this.pending = pending;
}
public Resource getIdealAssigned() {
return idealAssigned;
}
public String toGlobalString() {
StringBuilder sb = new StringBuilder();
sb.append("\n").append(toString());
for (TempQueuePerPartition c : children) {
sb.append(c.toGlobalString());
}
return sb.toString();
}
/**
* This method is visible to allow sub-classes to override the behavior,
* specifically to take into account locality-based limitations of how much
* the queue can consumed.
*
* @param rc the ResourceCalculator to be used.
* @param offered the input amount of Resource offered to this queue.
*
* @return the subset of Resource(s) that the queue can consumed after
* accounting for locality effects.
*/
protected Resource acceptedByLocality(ResourceCalculator rc,
Resource offered) {
return offered;
}
/**
* This method is visible to allow sub-classes to override the behavior,
* specifically for federation purposes we do not want to cap resources as it
* is done here.
*
* @param rc the {@code ResourceCalculator} to be used
* @param clusterResource the total cluster resources
* @param offered the resources offered to this queue
* @return the amount of resources accepted after considering max and
* deducting assigned.
*/
protected Resource filterByMaxDeductAssigned(ResourceCalculator rc,
Resource clusterResource, Resource offered) {
if (null == children || children.isEmpty()) {
Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract(
Resources.max(rc, clusterResource, getUsed(), getGuaranteed()),
idealAssigned);
maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource,
maxOfGuranteedAndUsedDeductAssigned, Resources.none());
offered = Resources.min(rc, clusterResource, offered,
maxOfGuranteedAndUsedDeductAssigned);
}
return offered;
}
/**
* This method is visible to allow sub-classes to ovverride the behavior,
* specifically for federation purposes we need to initialize per-sub-cluster
* roots as well as the global one.
*/
protected void initializeRootIdealWithGuarangeed() {
idealAssigned = Resources.clone(getGuaranteed());
}
} }

View File

@ -70,7 +70,7 @@ public class ResourceInfo {
@Override @Override
public String toString() { public String toString() {
return resources.toString(); return getResource().toString();
} }
public void setMemory(int memory) { public void setMemory(int memory) {
@ -90,6 +90,9 @@ public class ResourceInfo {
} }
public Resource getResource() { public Resource getResource() {
if (resources == null) {
resources = Resource.newInstance(memory, vCores);
}
return Resource.newInstance(resources); return Resource.newInstance(resources);
} }