YARN-2498. Respect labels in preemption policy of capacity scheduler for inter-queue preemption. Contributed by Wangda Tan
This commit is contained in:
parent
dcc5455e07
commit
d497f6ea2b
|
@ -102,6 +102,9 @@ Release 2.8.0 - UNRELEASED
|
|||
|
||||
YARN-3319. Implement a FairOrderingPolicy. (Craig Welch via wangda)
|
||||
|
||||
YARN-2498. Respect labels in preemption policy of capacity scheduler for
|
||||
inter-queue preemption. (Wangda Tan via jianhe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -26,11 +27,10 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.collections.map.HashedMap;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
@ -49,7 +48,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptE
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
|
@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
|||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
/**
|
||||
* This class implement a {@link SchedulingEditPolicy} that is designed to be
|
||||
|
@ -130,7 +132,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
private float percentageClusterPreemptionAllowed;
|
||||
private double naturalTerminationFactor;
|
||||
private boolean observeOnly;
|
||||
private Map<NodeId, Set<String>> labels;
|
||||
private Map<String, Map<String, TempQueuePerPartition>> queueToPartitions =
|
||||
new HashMap<>();
|
||||
private RMNodeLabelsManager nlm;
|
||||
|
||||
public ProportionalCapacityPreemptionPolicy() {
|
||||
clock = new SystemClock();
|
||||
|
@ -170,7 +174,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
|
||||
observeOnly = config.getBoolean(OBSERVE_ONLY, false);
|
||||
rc = scheduler.getResourceCalculator();
|
||||
labels = null;
|
||||
nlm = scheduler.getRMContext().getNodeLabelManager();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -182,34 +186,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
public void editSchedule() {
|
||||
CSQueue root = scheduler.getRootQueue();
|
||||
Resource clusterResources = Resources.clone(scheduler.getClusterResource());
|
||||
clusterResources = getNonLabeledResources(clusterResources);
|
||||
setNodeLabels(scheduler.getRMContext().getNodeLabelManager()
|
||||
.getNodeLabels());
|
||||
containerBasedPreemptOrKill(root, clusterResources);
|
||||
}
|
||||
|
||||
/**
|
||||
* Setting Node Labels
|
||||
*
|
||||
* @param nodelabels
|
||||
*/
|
||||
public void setNodeLabels(Map<NodeId, Set<String>> nodelabels) {
|
||||
labels = nodelabels;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method returns all non labeled resources.
|
||||
*
|
||||
* @param clusterResources
|
||||
* @return Resources
|
||||
*/
|
||||
private Resource getNonLabeledResources(Resource clusterResources) {
|
||||
RMContext rmcontext = scheduler.getRMContext();
|
||||
RMNodeLabelsManager lm = rmcontext.getNodeLabelManager();
|
||||
Resource res = lm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
|
||||
clusterResources);
|
||||
return res == null ? clusterResources : res;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method selects and tracks containers to be preempted. If a container
|
||||
|
@ -220,28 +198,46 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
*/
|
||||
private void containerBasedPreemptOrKill(CSQueue root,
|
||||
Resource clusterResources) {
|
||||
// All partitions to look at
|
||||
Set<String> allPartitions = new HashSet<>();
|
||||
allPartitions.addAll(scheduler.getRMContext()
|
||||
.getNodeLabelManager().getClusterNodeLabelNames());
|
||||
allPartitions.add(RMNodeLabelsManager.NO_LABEL);
|
||||
|
||||
// extract a summary of the queues from scheduler
|
||||
TempQueue tRoot;
|
||||
synchronized (scheduler) {
|
||||
tRoot = cloneQueues(root, clusterResources);
|
||||
queueToPartitions.clear();
|
||||
|
||||
for (String partitionToLookAt : allPartitions) {
|
||||
cloneQueues(root,
|
||||
nlm.getResourceByLabel(partitionToLookAt, clusterResources),
|
||||
partitionToLookAt);
|
||||
}
|
||||
}
|
||||
|
||||
// compute the ideal distribution of resources among queues
|
||||
// updates cloned queues state accordingly
|
||||
tRoot.idealAssigned = tRoot.guaranteed;
|
||||
// compute total preemption allowed
|
||||
Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
|
||||
percentageClusterPreemptionAllowed);
|
||||
List<TempQueue> queues =
|
||||
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
|
||||
|
||||
Set<String> leafQueueNames = null;
|
||||
for (String partition : allPartitions) {
|
||||
TempQueuePerPartition tRoot =
|
||||
getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition);
|
||||
// compute the ideal distribution of resources among queues
|
||||
// updates cloned queues state accordingly
|
||||
tRoot.idealAssigned = tRoot.guaranteed;
|
||||
|
||||
leafQueueNames =
|
||||
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
|
||||
}
|
||||
|
||||
// based on ideal allocation select containers to be preempted from each
|
||||
// queue and each application
|
||||
Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
|
||||
getContainersToPreempt(queues, clusterResources);
|
||||
getContainersToPreempt(leafQueueNames, clusterResources);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
logToCSV(queues);
|
||||
logToCSV(new ArrayList<String>(leafQueueNames));
|
||||
}
|
||||
|
||||
// if we are in observeOnly mode return before any action is taken
|
||||
|
@ -252,6 +248,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
// preempt (or kill) the selected containers
|
||||
for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
|
||||
: toPreempt.entrySet()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Send to scheduler: in app=" + e.getKey()
|
||||
+ " #containers-to-be-preempted=" + e.getValue().size());
|
||||
}
|
||||
for (RMContainer container : e.getValue()) {
|
||||
// if we tried to preempt this for more than maxWaitTime
|
||||
if (preempted.get(container) != null &&
|
||||
|
@ -291,23 +291,24 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
* @param totalPreemptionAllowed maximum amount of preemption allowed
|
||||
* @return a list of leaf queues updated with preemption targets
|
||||
*/
|
||||
private List<TempQueue> recursivelyComputeIdealAssignment(
|
||||
TempQueue root, Resource totalPreemptionAllowed) {
|
||||
List<TempQueue> leafs = new ArrayList<TempQueue>();
|
||||
private Set<String> recursivelyComputeIdealAssignment(
|
||||
TempQueuePerPartition root, Resource totalPreemptionAllowed) {
|
||||
Set<String> leafQueueNames = new HashSet<>();
|
||||
if (root.getChildren() != null &&
|
||||
root.getChildren().size() > 0) {
|
||||
// compute ideal distribution at this level
|
||||
computeIdealResourceDistribution(rc, root.getChildren(),
|
||||
totalPreemptionAllowed, root.idealAssigned);
|
||||
// compute recursively for lower levels and build list of leafs
|
||||
for(TempQueue t : root.getChildren()) {
|
||||
leafs.addAll(recursivelyComputeIdealAssignment(t, totalPreemptionAllowed));
|
||||
for(TempQueuePerPartition t : root.getChildren()) {
|
||||
leafQueueNames.addAll(recursivelyComputeIdealAssignment(t,
|
||||
totalPreemptionAllowed));
|
||||
}
|
||||
} else {
|
||||
// we are in a leaf nothing to do, just return yourself
|
||||
return Collections.singletonList(root);
|
||||
return ImmutableSet.of(root.queueName);
|
||||
}
|
||||
return leafs;
|
||||
return leafQueueNames;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -324,20 +325,21 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
* @param tot_guarant the amount of capacity assigned to this pool of queues
|
||||
*/
|
||||
private void computeIdealResourceDistribution(ResourceCalculator rc,
|
||||
List<TempQueue> queues, Resource totalPreemptionAllowed, Resource tot_guarant) {
|
||||
List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed,
|
||||
Resource tot_guarant) {
|
||||
|
||||
// qAlloc tracks currently active queues (will decrease progressively as
|
||||
// demand is met)
|
||||
List<TempQueue> qAlloc = new ArrayList<TempQueue>(queues);
|
||||
List<TempQueuePerPartition> qAlloc = new ArrayList<TempQueuePerPartition>(queues);
|
||||
// unassigned tracks how much resources are still to assign, initialized
|
||||
// with the total capacity for this set of queues
|
||||
Resource unassigned = Resources.clone(tot_guarant);
|
||||
|
||||
// group queues based on whether they have non-zero guaranteed capacity
|
||||
Set<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>();
|
||||
Set<TempQueue> zeroGuarQueues = new HashSet<TempQueue>();
|
||||
Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<TempQueuePerPartition>();
|
||||
Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<TempQueuePerPartition>();
|
||||
|
||||
for (TempQueue q : qAlloc) {
|
||||
for (TempQueuePerPartition q : qAlloc) {
|
||||
if (Resources
|
||||
.greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
|
||||
nonZeroGuarQueues.add(q);
|
||||
|
@ -361,7 +363,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
// based on ideal assignment computed above and current assignment we derive
|
||||
// how much preemption is required overall
|
||||
Resource totPreemptionNeeded = Resource.newInstance(0, 0);
|
||||
for (TempQueue t:queues) {
|
||||
for (TempQueuePerPartition t:queues) {
|
||||
if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
|
||||
Resources.addTo(totPreemptionNeeded,
|
||||
Resources.subtract(t.current, t.idealAssigned));
|
||||
|
@ -379,12 +381,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
|
||||
// assign to each queue the amount of actual preemption based on local
|
||||
// information of ideal preemption and scaling factor
|
||||
for (TempQueue t : queues) {
|
||||
for (TempQueuePerPartition t : queues) {
|
||||
t.assignPreemption(scalingFactor, rc, tot_guarant);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
long time = clock.getTime();
|
||||
for (TempQueue t : queues) {
|
||||
for (TempQueuePerPartition t : queues) {
|
||||
LOG.debug(time + ": " + t);
|
||||
}
|
||||
}
|
||||
|
@ -400,8 +402,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
* distributed uniformly.
|
||||
*/
|
||||
private void computeFixpointAllocation(ResourceCalculator rc,
|
||||
Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned,
|
||||
boolean ignoreGuarantee) {
|
||||
Resource tot_guarant, Collection<TempQueuePerPartition> qAlloc,
|
||||
Resource unassigned, boolean ignoreGuarantee) {
|
||||
// Prior to assigning the unused resources, process each queue as follows:
|
||||
// If current > guaranteed, idealAssigned = guaranteed + untouchable extra
|
||||
// Else idealAssigned = current;
|
||||
|
@ -410,10 +412,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
// idealAssigned >= current + pending), remove it from consideration.
|
||||
// Sort queues from most under-guaranteed to most over-guaranteed.
|
||||
TQComparator tqComparator = new TQComparator(rc, tot_guarant);
|
||||
PriorityQueue<TempQueue> orderedByNeed =
|
||||
new PriorityQueue<TempQueue>(10,tqComparator);
|
||||
for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
|
||||
TempQueue q = i.next();
|
||||
PriorityQueue<TempQueuePerPartition> orderedByNeed =
|
||||
new PriorityQueue<TempQueuePerPartition>(10, tqComparator);
|
||||
for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
|
||||
TempQueuePerPartition q = i.next();
|
||||
if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
|
||||
q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra);
|
||||
} else {
|
||||
|
@ -442,10 +444,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
// place it back in the ordered list of queues, recalculating its place
|
||||
// in the order of most under-guaranteed to most over-guaranteed. In this
|
||||
// way, the most underserved queue(s) are always given resources first.
|
||||
Collection<TempQueue> underserved =
|
||||
Collection<TempQueuePerPartition> underserved =
|
||||
getMostUnderservedQueues(orderedByNeed, tqComparator);
|
||||
for (Iterator<TempQueue> i = underserved.iterator(); i.hasNext();) {
|
||||
TempQueue sub = i.next();
|
||||
for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
|
||||
.hasNext();) {
|
||||
TempQueuePerPartition sub = i.next();
|
||||
Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
|
||||
unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
|
||||
Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
|
||||
|
@ -466,13 +469,13 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
// Take the most underserved TempQueue (the one on the head). Collect and
|
||||
// return the list of all queues that have the same idealAssigned
|
||||
// percentage of guaranteed.
|
||||
protected Collection<TempQueue> getMostUnderservedQueues(
|
||||
PriorityQueue<TempQueue> orderedByNeed, TQComparator tqComparator) {
|
||||
ArrayList<TempQueue> underserved = new ArrayList<TempQueue>();
|
||||
protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
|
||||
PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) {
|
||||
ArrayList<TempQueuePerPartition> underserved = new ArrayList<TempQueuePerPartition>();
|
||||
while (!orderedByNeed.isEmpty()) {
|
||||
TempQueue q1 = orderedByNeed.remove();
|
||||
TempQueuePerPartition q1 = orderedByNeed.remove();
|
||||
underserved.add(q1);
|
||||
TempQueue q2 = orderedByNeed.peek();
|
||||
TempQueuePerPartition q2 = orderedByNeed.peek();
|
||||
// q1's pct of guaranteed won't be larger than q2's. If it's less, then
|
||||
// return what has already been collected. Otherwise, q1's pct of
|
||||
// guaranteed == that of q2, so add q2 to underserved list during the
|
||||
|
@ -491,24 +494,90 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
* @param queues the list of queues to consider
|
||||
*/
|
||||
private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
|
||||
Collection<TempQueue> queues, boolean ignoreGuar) {
|
||||
Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
|
||||
Resource activeCap = Resource.newInstance(0, 0);
|
||||
|
||||
if (ignoreGuar) {
|
||||
for (TempQueue q : queues) {
|
||||
for (TempQueuePerPartition q : queues) {
|
||||
q.normalizedGuarantee = (float) 1.0f / ((float) queues.size());
|
||||
}
|
||||
} else {
|
||||
for (TempQueue q : queues) {
|
||||
for (TempQueuePerPartition q : queues) {
|
||||
Resources.addTo(activeCap, q.guaranteed);
|
||||
}
|
||||
for (TempQueue q : queues) {
|
||||
for (TempQueuePerPartition q : queues) {
|
||||
q.normalizedGuarantee = Resources.divide(rc, clusterResource,
|
||||
q.guaranteed, activeCap);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String getPartitionByNodeId(NodeId nodeId) {
|
||||
return scheduler.getSchedulerNode(nodeId).getPartition();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return should we preempt rmContainer. If we should, deduct from
|
||||
* <code>resourceToObtainByPartition</code>
|
||||
*/
|
||||
private boolean tryPreemptContainerAndDeductResToObtain(
|
||||
Map<String, Resource> resourceToObtainByPartitions,
|
||||
RMContainer rmContainer, Resource clusterResource,
|
||||
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap) {
|
||||
ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
|
||||
|
||||
// We will not account resource of a container twice or more
|
||||
if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode());
|
||||
Resource toObtainByPartition =
|
||||
resourceToObtainByPartitions.get(nodePartition);
|
||||
|
||||
if (null != toObtainByPartition
|
||||
&& Resources.greaterThan(rc, clusterResource, toObtainByPartition,
|
||||
Resources.none())) {
|
||||
Resources.subtractFrom(toObtainByPartition,
|
||||
rmContainer.getAllocatedResource());
|
||||
// When we have no more resource need to obtain, remove from map.
|
||||
if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
|
||||
Resources.none())) {
|
||||
resourceToObtainByPartitions.remove(nodePartition);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Marked container=" + rmContainer.getContainerId()
|
||||
+ " in partition=" + nodePartition + " will be preempted");
|
||||
}
|
||||
// Add to preemptMap
|
||||
addToPreemptMap(preemptMap, attemptId, rmContainer);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean preemptMapContains(
|
||||
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
|
||||
ApplicationAttemptId attemptId, RMContainer rmContainer) {
|
||||
Set<RMContainer> rmContainers;
|
||||
if (null == (rmContainers = preemptMap.get(attemptId))) {
|
||||
return false;
|
||||
}
|
||||
return rmContainers.contains(rmContainer);
|
||||
}
|
||||
|
||||
private void addToPreemptMap(
|
||||
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
|
||||
ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
|
||||
Set<RMContainer> set;
|
||||
if (null == (set = preemptMap.get(appAttemptId))) {
|
||||
set = new HashSet<RMContainer>();
|
||||
preemptMap.put(appAttemptId, set);
|
||||
}
|
||||
set.add(containerToPreempt);
|
||||
}
|
||||
|
||||
/**
|
||||
* Based a resource preemption target drop reservations of containers and
|
||||
* if necessary select containers for preemption from applications in each
|
||||
|
@ -520,64 +589,106 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
* @return a map of applciationID to set of containers to preempt
|
||||
*/
|
||||
private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
|
||||
List<TempQueue> queues, Resource clusterResource) {
|
||||
Set<String> leafQueueNames, Resource clusterResource) {
|
||||
|
||||
Map<ApplicationAttemptId,Set<RMContainer>> preemptMap =
|
||||
new HashMap<ApplicationAttemptId,Set<RMContainer>>();
|
||||
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap =
|
||||
new HashMap<ApplicationAttemptId, Set<RMContainer>>();
|
||||
List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
|
||||
|
||||
for (TempQueue qT : queues) {
|
||||
if (qT.preemptionDisabled && qT.leafQueue != null) {
|
||||
// Loop all leaf queues
|
||||
for (String queueName : leafQueueNames) {
|
||||
// check if preemption disabled for the queue
|
||||
if (getQueueByPartition(queueName,
|
||||
RMNodeLabelsManager.NO_LABEL).preemptionDisabled) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (Resources.greaterThan(rc, clusterResource,
|
||||
qT.toBePreempted, Resource.newInstance(0, 0))) {
|
||||
LOG.debug("Tried to preempt the following "
|
||||
+ "resources from non-preemptable queue: "
|
||||
+ qT.queueName + " - Resources: " + qT.toBePreempted);
|
||||
}
|
||||
LOG.debug("skipping from queue=" + queueName
|
||||
+ " because it's a non-preemptable queue");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
// we act only if we are violating balance by more than
|
||||
// maxIgnoredOverCapacity
|
||||
if (Resources.greaterThan(rc, clusterResource, qT.current,
|
||||
Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
|
||||
// we introduce a dampening factor naturalTerminationFactor that
|
||||
// accounts for natural termination of containers
|
||||
Resource resToObtain =
|
||||
Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
|
||||
Resource skippedAMSize = Resource.newInstance(0, 0);
|
||||
|
||||
// lock the leafqueue while we scan applications and unreserve
|
||||
synchronized (qT.leafQueue) {
|
||||
Iterator<FiCaSchedulerApp> desc =
|
||||
qT.leafQueue.getOrderingPolicy().getPreemptionIterator();
|
||||
qT.actuallyPreempted = Resources.clone(resToObtain);
|
||||
while (desc.hasNext()) {
|
||||
FiCaSchedulerApp fc = desc.next();
|
||||
if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
|
||||
Resources.none())) {
|
||||
break;
|
||||
// compute resToObtainByPartition considered inter-queue preemption
|
||||
LeafQueue leafQueue = null;
|
||||
|
||||
Map<String, Resource> resToObtainByPartition =
|
||||
new HashMap<String, Resource>();
|
||||
for (TempQueuePerPartition qT : getQueuePartitions(queueName)) {
|
||||
leafQueue = qT.leafQueue;
|
||||
// we act only if we are violating balance by more than
|
||||
// maxIgnoredOverCapacity
|
||||
if (Resources.greaterThan(rc, clusterResource, qT.current,
|
||||
Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
|
||||
// we introduce a dampening factor naturalTerminationFactor that
|
||||
// accounts for natural termination of containers
|
||||
Resource resToObtain =
|
||||
Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
|
||||
// Only add resToObtain when it >= 0
|
||||
if (Resources.greaterThan(rc, clusterResource, resToObtain,
|
||||
Resources.none())) {
|
||||
resToObtainByPartition.put(qT.partition, resToObtain);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Queue=" + queueName + " partition=" + qT.partition
|
||||
+ " resource-to-obtain=" + resToObtain);
|
||||
}
|
||||
preemptMap.put(
|
||||
fc.getApplicationAttemptId(),
|
||||
preemptFrom(fc, clusterResource, resToObtain,
|
||||
skippedAMContainerlist, skippedAMSize));
|
||||
}
|
||||
Resource maxAMCapacityForThisQueue = Resources.multiply(
|
||||
Resources.multiply(clusterResource,
|
||||
qT.leafQueue.getAbsoluteCapacity()),
|
||||
qT.leafQueue.getMaxAMResourcePerQueuePercent());
|
||||
|
||||
// Can try preempting AMContainers (still saving atmost
|
||||
// maxAMCapacityForThisQueue AMResource's) if more resources are
|
||||
// required to be preempted from this Queue.
|
||||
preemptAMContainers(clusterResource, preemptMap,
|
||||
skippedAMContainerlist, resToObtain, skippedAMSize,
|
||||
maxAMCapacityForThisQueue);
|
||||
qT.actuallyPreempted = Resources.clone(resToObtain);
|
||||
} else {
|
||||
qT.actuallyPreempted = Resources.none();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (leafQueue) {
|
||||
// go through all ignore-partition-exclusivity containers first to make
|
||||
// sure such containers will be preempted first
|
||||
Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =
|
||||
leafQueue.getIgnoreExclusivityRMContainers();
|
||||
for (String partition : resToObtainByPartition.keySet()) {
|
||||
if (ignorePartitionExclusivityContainers.containsKey(partition)) {
|
||||
TreeSet<RMContainer> rmContainers =
|
||||
ignorePartitionExclusivityContainers.get(partition);
|
||||
// We will check container from reverse order, so latter submitted
|
||||
// application's containers will be preempted first.
|
||||
for (RMContainer c : rmContainers.descendingSet()) {
|
||||
boolean preempted =
|
||||
tryPreemptContainerAndDeductResToObtain(
|
||||
resToObtainByPartition, c, clusterResource, preemptMap);
|
||||
if (!preempted) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// preempt other containers
|
||||
Resource skippedAMSize = Resource.newInstance(0, 0);
|
||||
Iterator<FiCaSchedulerApp> desc =
|
||||
leafQueue.getOrderingPolicy().getPreemptionIterator();
|
||||
while (desc.hasNext()) {
|
||||
FiCaSchedulerApp fc = desc.next();
|
||||
// When we complete preempt from one partition, we will remove from
|
||||
// resToObtainByPartition, so when it becomes empty, we can get no
|
||||
// more preemption is needed
|
||||
if (resToObtainByPartition.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
|
||||
preemptFrom(fc, clusterResource, resToObtainByPartition,
|
||||
skippedAMContainerlist, skippedAMSize, preemptMap);
|
||||
}
|
||||
|
||||
// Can try preempting AMContainers (still saving atmost
|
||||
// maxAMCapacityForThisQueue AMResource's) if more resources are
|
||||
// required to be preempted from this Queue.
|
||||
Resource maxAMCapacityForThisQueue = Resources.multiply(
|
||||
Resources.multiply(clusterResource,
|
||||
leafQueue.getAbsoluteCapacity()),
|
||||
leafQueue.getMaxAMResourcePerQueuePercent());
|
||||
|
||||
preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist,
|
||||
resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue);
|
||||
}
|
||||
}
|
||||
|
||||
return preemptMap;
|
||||
}
|
||||
|
||||
|
@ -595,31 +706,27 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
*/
|
||||
private void preemptAMContainers(Resource clusterResource,
|
||||
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
|
||||
List<RMContainer> skippedAMContainerlist, Resource resToObtain,
|
||||
Resource skippedAMSize, Resource maxAMCapacityForThisQueue) {
|
||||
List<RMContainer> skippedAMContainerlist,
|
||||
Map<String, Resource> resToObtainByPartition, Resource skippedAMSize,
|
||||
Resource maxAMCapacityForThisQueue) {
|
||||
for (RMContainer c : skippedAMContainerlist) {
|
||||
// Got required amount of resources for preemption, can stop now
|
||||
if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
|
||||
Resources.none())) {
|
||||
if (resToObtainByPartition.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
// Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
|
||||
// container selection iteration for preemption will be stopped.
|
||||
// container selection iteration for preemption will be stopped.
|
||||
if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
|
||||
maxAMCapacityForThisQueue)) {
|
||||
break;
|
||||
}
|
||||
Set<RMContainer> contToPrempt = preemptMap.get(c
|
||||
.getApplicationAttemptId());
|
||||
if (null == contToPrempt) {
|
||||
contToPrempt = new HashSet<RMContainer>();
|
||||
preemptMap.put(c.getApplicationAttemptId(), contToPrempt);
|
||||
|
||||
boolean preempted =
|
||||
tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
|
||||
clusterResource, preemptMap);
|
||||
if (preempted) {
|
||||
Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
|
||||
}
|
||||
contToPrempt.add(c);
|
||||
|
||||
Resources.subtractFrom(resToObtain, c.getContainer().getResource());
|
||||
Resources.subtractFrom(skippedAMSize, c.getContainer()
|
||||
.getResource());
|
||||
}
|
||||
skippedAMContainerlist.clear();
|
||||
}
|
||||
|
@ -627,71 +734,59 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
/**
|
||||
* Given a target preemption for a specific application, select containers
|
||||
* to preempt (after unreserving all reservation for that app).
|
||||
*
|
||||
* @param app
|
||||
* @param clusterResource
|
||||
* @param rsrcPreempt
|
||||
* @return Set<RMContainer> Set of RMContainers
|
||||
*/
|
||||
private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
|
||||
Resource clusterResource, Resource rsrcPreempt,
|
||||
List<RMContainer> skippedAMContainerlist, Resource skippedAMSize) {
|
||||
Set<RMContainer> ret = new HashSet<RMContainer>();
|
||||
private void preemptFrom(FiCaSchedulerApp app,
|
||||
Resource clusterResource, Map<String, Resource> resToObtainByPartition,
|
||||
List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
|
||||
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap) {
|
||||
ApplicationAttemptId appId = app.getApplicationAttemptId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Looking at application=" + app.getApplicationAttemptId()
|
||||
+ " resourceToObtain=" + resToObtainByPartition);
|
||||
}
|
||||
|
||||
// first drop reserved containers towards rsrcPreempt
|
||||
List<RMContainer> reservations =
|
||||
List<RMContainer> reservedContainers =
|
||||
new ArrayList<RMContainer>(app.getReservedContainers());
|
||||
for (RMContainer c : reservations) {
|
||||
if (Resources.lessThanOrEqual(rc, clusterResource,
|
||||
rsrcPreempt, Resources.none())) {
|
||||
return ret;
|
||||
for (RMContainer c : reservedContainers) {
|
||||
if (resToObtainByPartition.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Try to preempt this container
|
||||
tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
|
||||
clusterResource, preemptMap);
|
||||
|
||||
if (!observeOnly) {
|
||||
dispatcher.handle(new ContainerPreemptEvent(appId, c,
|
||||
ContainerPreemptEventType.DROP_RESERVATION));
|
||||
}
|
||||
Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
|
||||
}
|
||||
|
||||
// if more resources are to be freed go through all live containers in
|
||||
// reverse priority and reverse allocation order and mark them for
|
||||
// preemption
|
||||
List<RMContainer> containers =
|
||||
List<RMContainer> liveContainers =
|
||||
new ArrayList<RMContainer>(app.getLiveContainers());
|
||||
|
||||
sortContainers(containers);
|
||||
sortContainers(liveContainers);
|
||||
|
||||
for (RMContainer c : containers) {
|
||||
if (Resources.lessThanOrEqual(rc, clusterResource,
|
||||
rsrcPreempt, Resources.none())) {
|
||||
return ret;
|
||||
for (RMContainer c : liveContainers) {
|
||||
if (resToObtainByPartition.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Skip AM Container from preemption for now.
|
||||
if (c.isAMContainer()) {
|
||||
skippedAMContainerlist.add(c);
|
||||
Resources.addTo(skippedAMSize, c.getContainer().getResource());
|
||||
Resources.addTo(skippedAMSize, c.getAllocatedResource());
|
||||
continue;
|
||||
}
|
||||
// skip Labeled resource
|
||||
if(isLabeledContainer(c)){
|
||||
continue;
|
||||
}
|
||||
ret.add(c);
|
||||
Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checking if given container is a labeled container
|
||||
*
|
||||
* @param c
|
||||
* @return true/false
|
||||
*/
|
||||
private boolean isLabeledContainer(RMContainer c) {
|
||||
return labels.containsKey(c.getAllocatedNode());
|
||||
// Try to preempt this container
|
||||
tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
|
||||
clusterResource, preemptMap);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -733,32 +828,48 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
* the leaves. Finally it aggregates pending resources in each queue and rolls
|
||||
* it up to higher levels.
|
||||
*
|
||||
* @param root the root of the CapacityScheduler queue hierarchy
|
||||
* @param clusterResources the total amount of resources in the cluster
|
||||
* @param curQueue current queue which I'm looking at now
|
||||
* @param partitionResource the total amount of resources in the cluster
|
||||
* @return the root of the cloned queue hierarchy
|
||||
*/
|
||||
private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
|
||||
TempQueue ret;
|
||||
synchronized (root) {
|
||||
String queueName = root.getQueueName();
|
||||
float absUsed = root.getAbsoluteUsedCapacity();
|
||||
float absCap = root.getAbsoluteCapacity();
|
||||
float absMaxCap = root.getAbsoluteMaximumCapacity();
|
||||
boolean preemptionDisabled = root.getPreemptionDisabled();
|
||||
private TempQueuePerPartition cloneQueues(CSQueue curQueue,
|
||||
Resource partitionResource, String partitionToLookAt) {
|
||||
TempQueuePerPartition ret;
|
||||
synchronized (curQueue) {
|
||||
String queueName = curQueue.getQueueName();
|
||||
QueueCapacities qc = curQueue.getQueueCapacities();
|
||||
float absUsed = qc.getAbsoluteUsedCapacity(partitionToLookAt);
|
||||
float absCap = qc.getAbsoluteCapacity(partitionToLookAt);
|
||||
float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
|
||||
boolean preemptionDisabled = curQueue.getPreemptionDisabled();
|
||||
|
||||
Resource current = Resources.multiply(clusterResources, absUsed);
|
||||
Resource guaranteed = Resources.multiply(clusterResources, absCap);
|
||||
Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
|
||||
Resource current = Resources.multiply(partitionResource, absUsed);
|
||||
Resource guaranteed = Resources.multiply(partitionResource, absCap);
|
||||
Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap);
|
||||
|
||||
// when partition is a non-exclusive partition, the actual maxCapacity
|
||||
// could more than specified maxCapacity
|
||||
try {
|
||||
if (!scheduler.getRMContext().getNodeLabelManager()
|
||||
.isExclusiveNodeLabel(partitionToLookAt)) {
|
||||
maxCapacity =
|
||||
Resources.max(rc, partitionResource, maxCapacity, current);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// This may cause by partition removed when running capacity monitor,
|
||||
// just ignore the error, this will be corrected when doing next check.
|
||||
}
|
||||
|
||||
Resource extra = Resource.newInstance(0, 0);
|
||||
if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) {
|
||||
if (Resources.greaterThan(rc, partitionResource, current, guaranteed)) {
|
||||
extra = Resources.subtract(current, guaranteed);
|
||||
}
|
||||
if (root instanceof LeafQueue) {
|
||||
LeafQueue l = (LeafQueue) root;
|
||||
Resource pending = l.getTotalResourcePending();
|
||||
ret = new TempQueue(queueName, current, pending, guaranteed,
|
||||
maxCapacity, preemptionDisabled);
|
||||
if (curQueue instanceof LeafQueue) {
|
||||
LeafQueue l = (LeafQueue) curQueue;
|
||||
Resource pending =
|
||||
l.getQueueResourceUsage().getPending(partitionToLookAt);
|
||||
ret = new TempQueuePerPartition(queueName, current, pending, guaranteed,
|
||||
maxCapacity, preemptionDisabled, partitionToLookAt);
|
||||
if (preemptionDisabled) {
|
||||
ret.untouchableExtra = extra;
|
||||
} else {
|
||||
|
@ -767,17 +878,19 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
ret.setLeafQueue(l);
|
||||
} else {
|
||||
Resource pending = Resource.newInstance(0, 0);
|
||||
ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
|
||||
maxCapacity, false);
|
||||
ret =
|
||||
new TempQueuePerPartition(curQueue.getQueueName(), current, pending,
|
||||
guaranteed, maxCapacity, false, partitionToLookAt);
|
||||
Resource childrensPreemptable = Resource.newInstance(0, 0);
|
||||
for (CSQueue c : root.getChildQueues()) {
|
||||
TempQueue subq = cloneQueues(c, clusterResources);
|
||||
for (CSQueue c : curQueue.getChildQueues()) {
|
||||
TempQueuePerPartition subq =
|
||||
cloneQueues(c, partitionResource, partitionToLookAt);
|
||||
Resources.addTo(childrensPreemptable, subq.preemptableExtra);
|
||||
ret.addChild(subq);
|
||||
}
|
||||
// untouchableExtra = max(extra - childrenPreemptable, 0)
|
||||
if (Resources.greaterThanOrEqual(
|
||||
rc, clusterResources, childrensPreemptable, extra)) {
|
||||
rc, partitionResource, childrensPreemptable, extra)) {
|
||||
ret.untouchableExtra = Resource.newInstance(0, 0);
|
||||
} else {
|
||||
ret.untouchableExtra =
|
||||
|
@ -785,52 +898,87 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
}
|
||||
}
|
||||
}
|
||||
addTempQueuePartition(ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
// simple printout function that reports internal queue state (useful for
|
||||
// plotting)
|
||||
private void logToCSV(List<TempQueue> unorderedqueues){
|
||||
List<TempQueue> queues = new ArrayList<TempQueue>(unorderedqueues);
|
||||
Collections.sort(queues, new Comparator<TempQueue>(){
|
||||
@Override
|
||||
public int compare(TempQueue o1, TempQueue o2) {
|
||||
return o1.queueName.compareTo(o2.queueName);
|
||||
}});
|
||||
private void logToCSV(List<String> leafQueueNames){
|
||||
Collections.sort(leafQueueNames);
|
||||
String queueState = " QUEUESTATE: " + clock.getTime();
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(queueState);
|
||||
for (TempQueue tq : queues) {
|
||||
|
||||
for (String queueName : leafQueueNames) {
|
||||
TempQueuePerPartition tq =
|
||||
getQueueByPartition(queueName, RMNodeLabelsManager.NO_LABEL);
|
||||
sb.append(", ");
|
||||
tq.appendLogString(sb);
|
||||
}
|
||||
LOG.debug(sb.toString());
|
||||
}
|
||||
|
||||
private void addTempQueuePartition(TempQueuePerPartition queuePartition) {
|
||||
String queueName = queuePartition.queueName;
|
||||
|
||||
Map<String, TempQueuePerPartition> queuePartitions;
|
||||
if (null == (queuePartitions = queueToPartitions.get(queueName))) {
|
||||
queuePartitions = new HashMap<String, TempQueuePerPartition>();
|
||||
queueToPartitions.put(queueName, queuePartitions);
|
||||
}
|
||||
queuePartitions.put(queuePartition.partition, queuePartition);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get queue partition by given queueName and partitionName
|
||||
*/
|
||||
private TempQueuePerPartition getQueueByPartition(String queueName,
|
||||
String partition) {
|
||||
Map<String, TempQueuePerPartition> partitionToQueues = null;
|
||||
if (null == (partitionToQueues = queueToPartitions.get(queueName))) {
|
||||
return null;
|
||||
}
|
||||
return partitionToQueues.get(partition);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all queue partitions by given queueName
|
||||
*/
|
||||
private Collection<TempQueuePerPartition> getQueuePartitions(String queueName) {
|
||||
if (!queueToPartitions.containsKey(queueName)) {
|
||||
return null;
|
||||
}
|
||||
return queueToPartitions.get(queueName).values();
|
||||
}
|
||||
|
||||
/**
|
||||
* Temporary data-structure tracking resource availability, pending resource
|
||||
* need, current utilization. Used to clone {@link CSQueue}.
|
||||
* need, current utilization. This is per-queue-per-partition data structure
|
||||
*/
|
||||
static class TempQueue {
|
||||
static class TempQueuePerPartition {
|
||||
final String queueName;
|
||||
final Resource current;
|
||||
final Resource pending;
|
||||
final Resource guaranteed;
|
||||
final Resource maxCapacity;
|
||||
final String partition;
|
||||
Resource idealAssigned;
|
||||
Resource toBePreempted;
|
||||
// For logging purpose
|
||||
Resource actuallyPreempted;
|
||||
Resource untouchableExtra;
|
||||
Resource preemptableExtra;
|
||||
|
||||
double normalizedGuarantee;
|
||||
|
||||
final ArrayList<TempQueue> children;
|
||||
final ArrayList<TempQueuePerPartition> children;
|
||||
LeafQueue leafQueue;
|
||||
boolean preemptionDisabled;
|
||||
|
||||
TempQueue(String queueName, Resource current, Resource pending,
|
||||
Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled) {
|
||||
TempQueuePerPartition(String queueName, Resource current, Resource pending,
|
||||
Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled,
|
||||
String partition) {
|
||||
this.queueName = queueName;
|
||||
this.current = current;
|
||||
this.pending = pending;
|
||||
|
@ -840,10 +988,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
this.actuallyPreempted = Resource.newInstance(0, 0);
|
||||
this.toBePreempted = Resource.newInstance(0, 0);
|
||||
this.normalizedGuarantee = Float.NaN;
|
||||
this.children = new ArrayList<TempQueue>();
|
||||
this.children = new ArrayList<TempQueuePerPartition>();
|
||||
this.untouchableExtra = Resource.newInstance(0, 0);
|
||||
this.preemptableExtra = Resource.newInstance(0, 0);
|
||||
this.preemptionDisabled = preemptionDisabled;
|
||||
this.partition = partition;
|
||||
}
|
||||
|
||||
public void setLeafQueue(LeafQueue l){
|
||||
|
@ -855,19 +1004,19 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
* When adding a child we also aggregate its pending resource needs.
|
||||
* @param q the child queue to add to this queue
|
||||
*/
|
||||
public void addChild(TempQueue q) {
|
||||
public void addChild(TempQueuePerPartition q) {
|
||||
assert leafQueue == null;
|
||||
children.add(q);
|
||||
Resources.addTo(pending, q.pending);
|
||||
}
|
||||
|
||||
public void addChildren(ArrayList<TempQueue> queues) {
|
||||
public void addChildren(ArrayList<TempQueuePerPartition> queues) {
|
||||
assert leafQueue == null;
|
||||
children.addAll(queues);
|
||||
}
|
||||
|
||||
|
||||
public ArrayList<TempQueue> getChildren(){
|
||||
public ArrayList<TempQueuePerPartition> getChildren(){
|
||||
return children;
|
||||
}
|
||||
|
||||
|
@ -909,7 +1058,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
|
||||
public void printAll() {
|
||||
LOG.info(this.toString());
|
||||
for (TempQueue sub : this.getChildren()) {
|
||||
for (TempQueuePerPartition sub : this.getChildren()) {
|
||||
sub.printAll();
|
||||
}
|
||||
}
|
||||
|
@ -942,7 +1091,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
|
||||
}
|
||||
|
||||
static class TQComparator implements Comparator<TempQueue> {
|
||||
static class TQComparator implements Comparator<TempQueuePerPartition> {
|
||||
private ResourceCalculator rc;
|
||||
private Resource clusterRes;
|
||||
|
||||
|
@ -952,7 +1101,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
}
|
||||
|
||||
@Override
|
||||
public int compare(TempQueue tq1, TempQueue tq2) {
|
||||
public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
|
||||
if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -965,7 +1114,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
// Calculates idealAssigned / guaranteed
|
||||
// TempQueues with 0 guarantees are always considered the most over
|
||||
// capacity and therefore considered last for resources.
|
||||
private double getIdealPctOfGuaranteed(TempQueue q) {
|
||||
private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
|
||||
double pctOver = Integer.MAX_VALUE;
|
||||
if (q != null && Resources.greaterThan(
|
||||
rc, clusterRes, q.guaranteed, Resources.none())) {
|
||||
|
|
|
@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
|
|||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public class RMContainerImpl implements RMContainer {
|
||||
public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(RMContainerImpl.class);
|
||||
|
||||
|
@ -615,4 +615,30 @@ public class RMContainerImpl implements RMContainer {
|
|||
}
|
||||
return nodeLabelExpression;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof RMContainer) {
|
||||
if (null != getContainerId()) {
|
||||
return getContainerId().equals(((RMContainer) obj).getContainerId());
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
if (null != getContainerId()) {
|
||||
return getContainerId().hashCode();
|
||||
}
|
||||
return super.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(RMContainer o) {
|
||||
if (containerId != null && o.getContainerId() != null) {
|
||||
return containerId.compareTo(o.getContainerId());
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -153,7 +153,7 @@ public class CapacityScheduler extends
|
|||
static final PartitionedQueueComparator partitionedQueueComparator =
|
||||
new PartitionedQueueComparator();
|
||||
|
||||
static final Comparator<FiCaSchedulerApp> applicationComparator =
|
||||
public static final Comparator<FiCaSchedulerApp> applicationComparator =
|
||||
new Comparator<FiCaSchedulerApp>() {
|
||||
@Override
|
||||
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
|
||||
|
|
|
@ -68,9 +68,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
|
||||
|
@ -118,11 +119,16 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
private final QueueResourceLimitsInfo queueResourceLimitsInfo =
|
||||
new QueueResourceLimitsInfo();
|
||||
|
||||
|
||||
private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
|
||||
|
||||
private OrderingPolicy<FiCaSchedulerApp>
|
||||
orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>();
|
||||
|
||||
// record all ignore partition exclusivityRMContainer, this will be used to do
|
||||
// preemption, key is the partition of the RMContainer allocated on
|
||||
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
|
||||
new HashMap<>();
|
||||
|
||||
public LeafQueue(CapacitySchedulerContext cs,
|
||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||
|
@ -921,11 +927,16 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
Resource assigned = assignment.getResource();
|
||||
if (Resources.greaterThan(
|
||||
resourceCalculator, clusterResource, assigned, Resources.none())) {
|
||||
// Get reserved or allocated container from application
|
||||
RMContainer reservedOrAllocatedRMContainer =
|
||||
application.getRMContainer(assignment
|
||||
.getAssignmentInformation()
|
||||
.getFirstAllocatedOrReservedContainerId());
|
||||
|
||||
// Book-keeping
|
||||
// Note: Update headroom to account for current allocation too...
|
||||
allocateResource(clusterResource, application, assigned,
|
||||
node.getPartition());
|
||||
node.getPartition(), reservedOrAllocatedRMContainer);
|
||||
|
||||
// Don't reset scheduling opportunities for offswitch assignments
|
||||
// otherwise the app will be delayed for each non-local assignment.
|
||||
|
@ -1720,7 +1731,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
orderingPolicy.containerReleased(application, rmContainer);
|
||||
|
||||
releaseResource(clusterResource, application,
|
||||
container.getResource(), node.getPartition());
|
||||
container.getResource(), node.getPartition(), rmContainer);
|
||||
LOG.info("completedContainer" +
|
||||
" container=" + container +
|
||||
" queue=" + this +
|
||||
|
@ -1738,9 +1749,22 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
synchronized void allocateResource(Resource clusterResource,
|
||||
SchedulerApplicationAttempt application, Resource resource,
|
||||
String nodePartition) {
|
||||
String nodePartition, RMContainer rmContainer) {
|
||||
super.allocateResource(clusterResource, resource, nodePartition);
|
||||
|
||||
// handle ignore exclusivity container
|
||||
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
|
||||
RMNodeLabelsManager.NO_LABEL)
|
||||
&& !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||
TreeSet<RMContainer> rmContainers = null;
|
||||
if (null == (rmContainers =
|
||||
ignorePartitionExclusivityRMContainers.get(nodePartition))) {
|
||||
rmContainers = new TreeSet<>();
|
||||
ignorePartitionExclusivityRMContainers.put(nodePartition, rmContainers);
|
||||
}
|
||||
rmContainers.add(rmContainer);
|
||||
}
|
||||
|
||||
// Update user metrics
|
||||
String userName = application.getUser();
|
||||
User user = getUser(userName);
|
||||
|
@ -1760,10 +1784,25 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
}
|
||||
}
|
||||
|
||||
synchronized void releaseResource(Resource clusterResource,
|
||||
FiCaSchedulerApp application, Resource resource, String nodePartition) {
|
||||
synchronized void releaseResource(Resource clusterResource,
|
||||
FiCaSchedulerApp application, Resource resource, String nodePartition,
|
||||
RMContainer rmContainer) {
|
||||
super.releaseResource(clusterResource, resource, nodePartition);
|
||||
|
||||
// handle ignore exclusivity container
|
||||
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
|
||||
RMNodeLabelsManager.NO_LABEL)
|
||||
&& !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||
if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) {
|
||||
Set<RMContainer> rmContainers =
|
||||
ignorePartitionExclusivityRMContainers.get(nodePartition);
|
||||
rmContainers.remove(rmContainer);
|
||||
if (rmContainers.isEmpty()) {
|
||||
ignorePartitionExclusivityRMContainers.remove(nodePartition);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update user metrics
|
||||
String userName = application.getUser();
|
||||
User user = getUser(userName);
|
||||
|
@ -1912,7 +1951,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
FiCaSchedulerNode node =
|
||||
scheduler.getNode(rmContainer.getContainer().getNodeId());
|
||||
allocateResource(clusterResource, attempt, rmContainer.getContainer()
|
||||
.getResource(), node.getPartition());
|
||||
.getResource(), node.getPartition(), rmContainer);
|
||||
}
|
||||
getParent().recoverContainer(clusterResource, attempt, rmContainer);
|
||||
}
|
||||
|
@ -1953,7 +1992,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
FiCaSchedulerNode node =
|
||||
scheduler.getNode(rmContainer.getContainer().getNodeId());
|
||||
allocateResource(clusterResource, application, rmContainer.getContainer()
|
||||
.getResource(), node.getPartition());
|
||||
.getResource(), node.getPartition(), rmContainer);
|
||||
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
|
||||
+ " resource=" + rmContainer.getContainer().getResource()
|
||||
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
|
||||
|
@ -1971,7 +2010,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
FiCaSchedulerNode node =
|
||||
scheduler.getNode(rmContainer.getContainer().getNodeId());
|
||||
releaseResource(clusterResource, application, rmContainer.getContainer()
|
||||
.getResource(), node.getPartition());
|
||||
.getResource(), node.getPartition(), rmContainer);
|
||||
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
|
||||
+ " resource=" + rmContainer.getContainer().getResource()
|
||||
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
|
||||
|
@ -1982,6 +2021,17 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* return all ignored partition exclusivity RMContainers in the LeafQueue, this
|
||||
* will be used by preemption policy, and use of return
|
||||
* ignorePartitionExclusivityRMContainer should protected by LeafQueue
|
||||
* synchronized lock
|
||||
*/
|
||||
public synchronized Map<String, TreeSet<RMContainer>>
|
||||
getIgnoreExclusivityRMContainers() {
|
||||
return ignorePartitionExclusivityRMContainers;
|
||||
}
|
||||
|
||||
public void setCapacity(float capacity) {
|
||||
queueCapacities.setCapacity(capacity);
|
||||
}
|
||||
|
|
|
@ -18,16 +18,17 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class AssignmentInformation {
|
||||
|
@ -117,4 +118,24 @@ public class AssignmentInformation {
|
|||
public List<AssignmentDetails> getReservationDetails() {
|
||||
return operationDetails.get(Operation.RESERVATION);
|
||||
}
|
||||
|
||||
private ContainerId getFirstContainerIdFromOperation(Operation op) {
|
||||
if (null != operationDetails.get(Operation.ALLOCATION)) {
|
||||
List<AssignmentDetails> assignDetails =
|
||||
operationDetails.get(Operation.ALLOCATION);
|
||||
if (!assignDetails.isEmpty()) {
|
||||
return assignDetails.get(0).containerId;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public ContainerId getFirstAllocatedOrReservedContainerId() {
|
||||
ContainerId containerId = null;
|
||||
containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION);
|
||||
if (null != containerId) {
|
||||
return containerId;
|
||||
}
|
||||
return getFirstContainerIdFromOperation(Operation.RESERVATION);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,11 +25,12 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
|
|||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -37,27 +38,17 @@ import static org.mockito.Mockito.never;
|
|||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.collections.map.HashedMap;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -76,23 +67,27 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.mortbay.log.Log;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class TestProportionalCapacityPreemptionPolicy {
|
||||
|
||||
|
@ -798,50 +793,6 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
setAMContainer = false;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIdealAllocationForLabels() {
|
||||
int[][] qData = new int[][] {
|
||||
// / A B
|
||||
{ 80, 40, 40 }, // abs
|
||||
{ 80, 80, 80 }, // maxcap
|
||||
{ 80, 80, 0 }, // used
|
||||
{ 70, 20, 50 }, // pending
|
||||
{ 0, 0, 0 }, // reserved
|
||||
{ 5, 4, 1 }, // apps
|
||||
{ -1, 1, 1 }, // req granularity
|
||||
{ 2, 0, 0 }, // subqueues
|
||||
};
|
||||
setAMContainer = true;
|
||||
setLabeledContainer = true;
|
||||
Map<NodeId, Set<String>> labels = new HashMap<NodeId, Set<String>>();
|
||||
NodeId node = NodeId.newInstance("node1", 0);
|
||||
Set<String> labelSet = new HashSet<String>();
|
||||
labelSet.add("x");
|
||||
labels.put(node, labelSet);
|
||||
when(lm.getNodeLabels()).thenReturn(labels);
|
||||
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||
// Subtracting Label X resources from cluster resources
|
||||
when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
|
||||
Resources.clone(Resource.newInstance(80, 0)));
|
||||
clusterResources.setMemory(100);
|
||||
policy.editSchedule();
|
||||
|
||||
// By skipping AM Container and Labeled container, all other 18 containers
|
||||
// of appD will be
|
||||
// preempted
|
||||
verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appD)));
|
||||
|
||||
// By skipping AM Container and Labeled container, all other 18 containers
|
||||
// of appC will be
|
||||
// preempted
|
||||
verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
||||
|
||||
// rest 4 containers from appB will be preempted
|
||||
verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
|
||||
setAMContainer = false;
|
||||
setLabeledContainer = false;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreemptSkippedAMContainers() {
|
||||
int[][] qData = new int[][] {
|
||||
|
@ -944,6 +895,12 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
clusterResources =
|
||||
Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
|
||||
when(mCS.getClusterResource()).thenReturn(clusterResources);
|
||||
when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
|
||||
clusterResources);
|
||||
|
||||
SchedulerNode mNode = mock(SchedulerNode.class);
|
||||
when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL);
|
||||
when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode);
|
||||
return policy;
|
||||
}
|
||||
|
||||
|
@ -965,11 +922,16 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
float tot = leafAbsCapacities(abs, queues);
|
||||
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
|
||||
ParentQueue root = mockParentQueue(null, queues[0], pqs);
|
||||
when(root.getQueueName()).thenReturn("/");
|
||||
when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT);
|
||||
when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
|
||||
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
|
||||
when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
|
||||
when(root.getQueuePath()).thenReturn("root");
|
||||
QueueCapacities rootQc = new QueueCapacities(true);
|
||||
rootQc.setAbsoluteUsedCapacity(used[0] / tot);
|
||||
rootQc.setAbsoluteCapacity(abs[0] / tot);
|
||||
rootQc.setAbsoluteMaximumCapacity(maxCap[0] / tot);
|
||||
when(root.getQueueCapacities()).thenReturn(rootQc);
|
||||
when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT);
|
||||
boolean preemptionDisabled = mockPreemptionStatus("root");
|
||||
when(root.getPreemptionDisabled()).thenReturn(preemptionDisabled);
|
||||
|
||||
|
@ -987,6 +949,14 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
|
||||
when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
|
||||
when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
|
||||
|
||||
// We need to make these fields to QueueCapacities
|
||||
QueueCapacities qc = new QueueCapacities(false);
|
||||
qc.setAbsoluteUsedCapacity(used[i] / tot);
|
||||
qc.setAbsoluteCapacity(abs[i] / tot);
|
||||
qc.setAbsoluteMaximumCapacity(maxCap[i] / tot);
|
||||
when(q.getQueueCapacities()).thenReturn(qc);
|
||||
|
||||
String parentPathName = p.getQueuePath();
|
||||
parentPathName = (parentPathName == null) ? "root" : parentPathName;
|
||||
String queuePathName = (parentPathName+"."+queueName).replace("/","root");
|
||||
|
@ -1028,6 +998,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
return pq;
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
|
||||
int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
|
||||
LeafQueue lq = mock(LeafQueue.class);
|
||||
|
@ -1035,6 +1006,10 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
new ArrayList<ApplicationAttemptId>();
|
||||
when(lq.getTotalResourcePending()).thenReturn(
|
||||
Resource.newInstance(pending[i], 0));
|
||||
// need to set pending resource in resource usage as well
|
||||
ResourceUsage ru = new ResourceUsage();
|
||||
ru.setPending(Resource.newInstance(pending[i], 0));
|
||||
when(lq.getQueueResourceUsage()).thenReturn(ru);
|
||||
// consider moving where CapacityScheduler::comparator accessible
|
||||
final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
|
||||
new Comparator<FiCaSchedulerApp>() {
|
||||
|
@ -1124,6 +1099,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
when(mC.getContainerId()).thenReturn(cId);
|
||||
when(mC.getContainer()).thenReturn(c);
|
||||
when(mC.getApplicationAttemptId()).thenReturn(appAttId);
|
||||
when(mC.getAllocatedResource()).thenReturn(r);
|
||||
if (priority.AMCONTAINER.getValue() == cpriority) {
|
||||
when(mC.isAMContainer()).thenReturn(true);
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -138,7 +138,7 @@ public class TestChildQueueOrder {
|
|||
} else {
|
||||
FiCaSchedulerApp app1 = getMockApplication(0, "");
|
||||
((LeafQueue)queue).allocateResource(clusterResource, app1,
|
||||
allocatedResource, null);
|
||||
allocatedResource, null, null);
|
||||
}
|
||||
|
||||
// Next call - nothing
|
||||
|
|
|
@ -815,9 +815,9 @@ public class TestLeafQueue {
|
|||
qb.finishApplication(app_0.getApplicationId(), user_0);
|
||||
qb.finishApplication(app_2.getApplicationId(), user_1);
|
||||
qb.releaseResource(clusterResource, app_0, app_0.getResource(u0Priority),
|
||||
null);
|
||||
null, null);
|
||||
qb.releaseResource(clusterResource, app_2, app_2.getResource(u1Priority),
|
||||
null);
|
||||
null, null);
|
||||
|
||||
qb.setUserLimit(50);
|
||||
qb.setUserLimitFactor(1);
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
|
@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppRepor
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -1015,6 +1017,20 @@ public class TestNodeLabelContainerAllocation {
|
|||
// app1 gets all resource in partition=x
|
||||
Assert.assertEquals(10, schedulerNode1.getNumContainers());
|
||||
|
||||
// check non-exclusive containers of LeafQueue is correctly updated
|
||||
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
|
||||
Assert.assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey(
|
||||
"y"));
|
||||
Assert.assertEquals(10,
|
||||
leafQueue.getIgnoreExclusivityRMContainers().get("x").size());
|
||||
|
||||
// completes all containers of app1, ignoreExclusivityRMContainers should be
|
||||
// updated as well.
|
||||
cs.handle(new AppAttemptRemovedSchedulerEvent(
|
||||
am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false));
|
||||
Assert.assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey(
|
||||
"x"));
|
||||
|
||||
rm1.close();
|
||||
}
|
||||
|
||||
|
|
|
@ -150,7 +150,7 @@ public class TestParentQueue {
|
|||
} else {
|
||||
FiCaSchedulerApp app1 = getMockApplication(0, "");
|
||||
((LeafQueue)queue).allocateResource(clusterResource, app1,
|
||||
allocatedResource, null);
|
||||
allocatedResource, null, null);
|
||||
}
|
||||
|
||||
// Next call - nothing
|
||||
|
|
Loading…
Reference in New Issue