YARN-2009. CapacityScheduler: Add intra-queue preemption for app priority support. (Sunil G via wangda)

(cherry picked from commit 90dd3a8148)
This commit is contained in:
Wangda Tan 2016-10-31 15:18:31 -07:00
parent 9d13a13160
commit cef281abe6
18 changed files with 2550 additions and 413 deletions

View File

@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
/**
* Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector}.
*/
public class AbstractPreemptableResourceCalculator {
protected final CapacitySchedulerPreemptionContext context;
protected final ResourceCalculator rc;
private boolean isReservedPreemptionCandidatesSelector;
static class TQComparator implements Comparator<TempQueuePerPartition> {
private ResourceCalculator rc;
private Resource clusterRes;
TQComparator(ResourceCalculator rc, Resource clusterRes) {
this.rc = rc;
this.clusterRes = clusterRes;
}
@Override
public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
return -1;
}
if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
return 1;
}
return 0;
}
// Calculates idealAssigned / guaranteed
// TempQueues with 0 guarantees are always considered the most over
// capacity and therefore considered last for resources.
private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
double pctOver = Integer.MAX_VALUE;
if (q != null && Resources.greaterThan(rc, clusterRes, q.getGuaranteed(),
Resources.none())) {
pctOver = Resources.divide(rc, clusterRes, q.idealAssigned,
q.getGuaranteed());
}
return (pctOver);
}
}
/**
* PreemptableResourceCalculator constructor.
*
* @param preemptionContext context
* @param isReservedPreemptionCandidatesSelector
* this will be set by different implementation of candidate
* selectors, please refer to TempQueuePerPartition#offer for
* details.
*/
public AbstractPreemptableResourceCalculator(
CapacitySchedulerPreemptionContext preemptionContext,
boolean isReservedPreemptionCandidatesSelector) {
context = preemptionContext;
rc = preemptionContext.getResourceCalculator();
this.isReservedPreemptionCandidatesSelector =
isReservedPreemptionCandidatesSelector;
}
/**
* Given a set of queues compute the fix-point distribution of unassigned
* resources among them. As pending request of a queue are exhausted, the
* queue is removed from the set and remaining capacity redistributed among
* remaining queues. The distribution is weighted based on guaranteed
* capacity, unless asked to ignoreGuarantee, in which case resources are
* distributed uniformly.
*
* @param totGuarant
* total guaranteed resource
* @param qAlloc
* List of child queues
* @param unassigned
* Unassigned resource per queue
* @param ignoreGuarantee
* ignore guarantee per queue.
*/
protected void computeFixpointAllocation(Resource totGuarant,
Collection<TempQueuePerPartition> qAlloc, Resource unassigned,
boolean ignoreGuarantee) {
// Prior to assigning the unused resources, process each queue as follows:
// If current > guaranteed, idealAssigned = guaranteed + untouchable extra
// Else idealAssigned = current;
// Subtract idealAssigned resources from unassigned.
// If the queue has all of its needs met (that is, if
// idealAssigned >= current + pending), remove it from consideration.
// Sort queues from most under-guaranteed to most over-guaranteed.
TQComparator tqComparator = new TQComparator(rc, totGuarant);
PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
tqComparator);
for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
TempQueuePerPartition q = i.next();
Resource used = q.getUsed();
if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra);
} else {
q.idealAssigned = Resources.clone(used);
}
Resources.subtractFrom(unassigned, q.idealAssigned);
// If idealAssigned < (allocated + used + pending), q needs more
// resources, so
// add it to the list of underserved queues, ordered by need.
Resource curPlusPend = Resources.add(q.getUsed(), q.pending);
if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) {
orderedByNeed.add(q);
}
}
// assign all cluster resources until no more demand, or no resources are
// left
while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,
unassigned, Resources.none())) {
Resource wQassigned = Resource.newInstance(0, 0);
// we compute normalizedGuarantees capacity based on currently active
// queues
resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
// For each underserved queue (or set of queues if multiple are equally
// underserved), offer its share of the unassigned resources based on its
// normalized guarantee. After the offer, if the queue is not satisfied,
// place it back in the ordered list of queues, recalculating its place
// in the order of most under-guaranteed to most over-guaranteed. In this
// way, the most underserved queue(s) are always given resources first.
Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
orderedByNeed, tqComparator);
for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
.hasNext();) {
TempQueuePerPartition sub = i.next();
Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned,
sub.normalizedGuarantee, Resource.newInstance(1, 1));
Resource wQidle = sub.offer(wQavail, rc, totGuarant,
isReservedPreemptionCandidatesSelector);
Resource wQdone = Resources.subtract(wQavail, wQidle);
if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {
// The queue is still asking for more. Put it back in the priority
// queue, recalculating its order based on need.
orderedByNeed.add(sub);
}
Resources.addTo(wQassigned, wQdone);
}
Resources.subtractFrom(unassigned, wQassigned);
}
// Sometimes its possible that, all queues are properly served. So intra
// queue preemption will not try for any preemption. How ever there are
// chances that within a queue, there are some imbalances. Hence make sure
// all queues are added to list.
while (!orderedByNeed.isEmpty()) {
TempQueuePerPartition q1 = orderedByNeed.remove();
context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
}
}
/**
* Computes a normalizedGuaranteed capacity based on active queues.
*
* @param clusterResource
* the total amount of resources in the cluster
* @param queues
* the list of queues to consider
* @param ignoreGuar
* ignore guarantee.
*/
private void resetCapacity(Resource clusterResource,
Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
Resource activeCap = Resource.newInstance(0, 0);
if (ignoreGuar) {
for (TempQueuePerPartition q : queues) {
q.normalizedGuarantee = 1.0f / queues.size();
}
} else {
for (TempQueuePerPartition q : queues) {
Resources.addTo(activeCap, q.getGuaranteed());
}
for (TempQueuePerPartition q : queues) {
q.normalizedGuarantee = Resources.divide(rc, clusterResource,
q.getGuaranteed(), activeCap);
}
}
}
// Take the most underserved TempQueue (the one on the head). Collect and
// return the list of all queues that have the same idealAssigned
// percentage of guaranteed.
private Collection<TempQueuePerPartition> getMostUnderservedQueues(
PriorityQueue<TempQueuePerPartition> orderedByNeed,
TQComparator tqComparator) {
ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
while (!orderedByNeed.isEmpty()) {
TempQueuePerPartition q1 = orderedByNeed.remove();
underserved.add(q1);
// Add underserved queues in order for later uses
context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
TempQueuePerPartition q2 = orderedByNeed.peek();
// q1's pct of guaranteed won't be larger than q2's. If it's less, then
// return what has already been collected. Otherwise, q1's pct of
// guaranteed == that of q2, so add q2 to underserved list during the
// next pass.
if (q2 == null || tqComparator.compare(q1, q2) < 0) {
if (null != q2) {
context.addPartitionToUnderServedQueues(q2.queueName, q2.partition);
}
return underserved;
}
}
return underserved;
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Abstract temporary data-structure for tracking resource availability,pending
* resource need, current utilization for app/queue.
*/
public class AbstractPreemptionEntity {
// Following fields are copied from scheduler
final String queueName;
protected final Resource current;
protected final Resource amUsed;
protected final Resource reserved;
protected Resource pending;
// Following fields are settled and used by candidate selection policies
Resource idealAssigned;
Resource toBePreempted;
Resource selected;
private Resource actuallyToBePreempted;
private Resource toBePreemptFromOther;
AbstractPreemptionEntity(String queueName, Resource usedPerPartition,
Resource amUsedPerPartition, Resource reserved,
Resource pendingPerPartition) {
this.queueName = queueName;
this.current = usedPerPartition;
this.pending = pendingPerPartition;
this.reserved = reserved;
this.amUsed = amUsedPerPartition;
this.idealAssigned = Resource.newInstance(0, 0);
this.actuallyToBePreempted = Resource.newInstance(0, 0);
this.toBePreempted = Resource.newInstance(0, 0);
this.toBePreemptFromOther = Resource.newInstance(0, 0);
this.selected = Resource.newInstance(0, 0);
}
public Resource getUsed() {
return current;
}
public Resource getUsedDeductAM() {
return Resources.subtract(current, amUsed);
}
public Resource getAMUsed() {
return amUsed;
}
public Resource getPending() {
return pending;
}
public Resource getReserved() {
return reserved;
}
public Resource getActuallyToBePreempted() {
return actuallyToBePreempted;
}
public void setActuallyToBePreempted(Resource actuallyToBePreempted) {
this.actuallyToBePreempted = actuallyToBePreempted;
}
public Resource getToBePreemptFromOther() {
return toBePreemptFromOther;
}
public void setToBePreemptFromOther(Resource toBePreemptFromOther) {
this.toBePreemptFromOther = toBePreemptFromOther;
}
}

View File

@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set; import java.util.Set;
interface CapacitySchedulerPreemptionContext { interface CapacitySchedulerPreemptionContext {
@ -49,4 +51,16 @@ interface CapacitySchedulerPreemptionContext {
Set<String> getLeafQueueNames(); Set<String> getLeafQueueNames();
Set<String> getAllPartitions(); Set<String> getAllPartitions();
int getClusterMaxApplicationPriority();
Resource getPartitionResource(String partition);
LinkedHashSet<String> getUnderServedQueuesPerPartition(String partition);
void addPartitionToUnderServedQueues(String queueName, String partition);
float getMinimumThresholdForIntraQueuePreemption();
float getMaxAllowableLimitForIntraQueuePreemption();
} }

View File

@ -19,11 +19,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
@ -40,7 +43,8 @@ public class CapacitySchedulerPreemptionUtils {
continue; continue;
} }
// Only add resToObtainByPartition when actuallyToBePreempted resource >= 0 // Only add resToObtainByPartition when actuallyToBePreempted resource >=
// 0
if (Resources.greaterThan(context.getResourceCalculator(), if (Resources.greaterThan(context.getResourceCalculator(),
clusterResource, qT.getActuallyToBePreempted(), Resources.none())) { clusterResource, qT.getActuallyToBePreempted(), Resources.none())) {
resToObtainByPartition.put(qT.partition, resToObtainByPartition.put(qT.partition,
@ -57,8 +61,8 @@ public class CapacitySchedulerPreemptionUtils {
return false; return false;
} }
Set<RMContainer> containers = selectedCandidates.get( Set<RMContainer> containers = selectedCandidates
container.getApplicationAttemptId()); .get(container.getApplicationAttemptId());
if (containers == null) { if (containers == null) {
return false; return false;
} }
@ -70,8 +74,8 @@ public class CapacitySchedulerPreemptionUtils {
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) { Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
for (Set<RMContainer> containers : selectedCandidates.values()) { for (Set<RMContainer> containers : selectedCandidates.values()) {
for (RMContainer c : containers) { for (RMContainer c : containers) {
SchedulerNode schedulerNode = context.getScheduler().getSchedulerNode( SchedulerNode schedulerNode = context.getScheduler()
c.getAllocatedNode()); .getSchedulerNode(c.getAllocatedNode());
if (null == schedulerNode) { if (null == schedulerNode) {
continue; continue;
} }
@ -89,8 +93,113 @@ public class CapacitySchedulerPreemptionUtils {
if (null != res) { if (null != res) {
tq.deductActuallyToBePreempted(context.getResourceCalculator(), tq.deductActuallyToBePreempted(context.getResourceCalculator(),
tq.totalPartitionResource, res); tq.totalPartitionResource, res);
Collection<TempAppPerPartition> tas = tq.getApps();
if (null == tas || tas.isEmpty()) {
continue;
}
deductPreemptableResourcePerApp(context, tq.totalPartitionResource,
tas, res, partition);
} }
} }
} }
} }
private static void deductPreemptableResourcePerApp(
CapacitySchedulerPreemptionContext context,
Resource totalPartitionResource, Collection<TempAppPerPartition> tas,
Resource res, String partition) {
for (TempAppPerPartition ta : tas) {
ta.deductActuallyToBePreempted(context.getResourceCalculator(),
totalPartitionResource, res, partition);
}
}
/**
* Invoke this method to preempt container based on resToObtain.
*
* @param rc
* resource calculator
* @param context
* preemption context
* @param resourceToObtainByPartitions
* map to hold resource to obtain per partition
* @param rmContainer
* container
* @param clusterResource
* total resource
* @param preemptMap
* map to hold preempted containers
* @param totalPreemptionAllowed
* total preemption allowed per round
* @return should we preempt rmContainer. If we should, deduct from
* <code>resourceToObtainByPartition</code>
*/
public static boolean tryPreemptContainerAndDeductResToObtain(
ResourceCalculator rc, CapacitySchedulerPreemptionContext context,
Map<String, Resource> resourceToObtainByPartitions,
RMContainer rmContainer, Resource clusterResource,
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
Resource totalPreemptionAllowed) {
ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
// We will not account resource of a container twice or more
if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
return false;
}
String nodePartition = getPartitionByNodeId(context,
rmContainer.getAllocatedNode());
Resource toObtainByPartition = resourceToObtainByPartitions
.get(nodePartition);
if (null != toObtainByPartition
&& Resources.greaterThan(rc, clusterResource, toObtainByPartition,
Resources.none())
&& Resources.fitsIn(rc, clusterResource,
rmContainer.getAllocatedResource(), totalPreemptionAllowed)) {
Resources.subtractFrom(toObtainByPartition,
rmContainer.getAllocatedResource());
Resources.subtractFrom(totalPreemptionAllowed,
rmContainer.getAllocatedResource());
// When we have no more resource need to obtain, remove from map.
if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
Resources.none())) {
resourceToObtainByPartitions.remove(nodePartition);
}
// Add to preemptMap
addToPreemptMap(preemptMap, attemptId, rmContainer);
return true;
}
return false;
}
private static String getPartitionByNodeId(
CapacitySchedulerPreemptionContext context, NodeId nodeId) {
return context.getScheduler().getSchedulerNode(nodeId).getPartition();
}
private static void addToPreemptMap(
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
Set<RMContainer> set = preemptMap.get(appAttemptId);
if (null == set) {
set = new HashSet<>();
preemptMap.put(appAttemptId, set);
}
set.add(containerToPreempt);
}
private static boolean preemptMapContains(
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
ApplicationAttemptId attemptId, RMContainer rmContainer) {
Set<RMContainer> rmContainers = preemptMap.get(attemptId);
if (null == rmContainers) {
return false;
}
return rmContainers.contains(rmContainer);
}
} }

View File

@ -18,11 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -33,9 +31,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -111,9 +106,11 @@ public class FifoCandidatesSelector
// Skip already selected containers // Skip already selected containers
continue; continue;
} }
boolean preempted = tryPreemptContainerAndDeductResToObtain( boolean preempted = CapacitySchedulerPreemptionUtils
resToObtainByPartition, c, clusterResource, selectedCandidates, .tryPreemptContainerAndDeductResToObtain(rc,
totalPreemptionAllowed); preemptionContext, resToObtainByPartition, c,
clusterResource, selectedCandidates,
totalPreemptionAllowed);
if (!preempted) { if (!preempted) {
continue; continue;
} }
@ -184,9 +181,10 @@ public class FifoCandidatesSelector
break; break;
} }
boolean preempted = boolean preempted = CapacitySchedulerPreemptionUtils
tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
clusterResource, preemptMap, totalPreemptionAllowed); resToObtainByPartition, c, clusterResource, preemptMap,
totalPreemptionAllowed);
if (preempted) { if (preempted) {
Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
} }
@ -194,68 +192,6 @@ public class FifoCandidatesSelector
skippedAMContainerlist.clear(); skippedAMContainerlist.clear();
} }
private boolean preemptMapContains(
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
ApplicationAttemptId attemptId, RMContainer rmContainer) {
Set<RMContainer> rmContainers;
if (null == (rmContainers = preemptMap.get(attemptId))) {
return false;
}
return rmContainers.contains(rmContainer);
}
/**
* Return should we preempt rmContainer. If we should, deduct from
* <code>resourceToObtainByPartition</code>
*/
private boolean tryPreemptContainerAndDeductResToObtain(
Map<String, Resource> resourceToObtainByPartitions,
RMContainer rmContainer, Resource clusterResource,
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
Resource totalPreemptionAllowed) {
ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
// We will not account resource of a container twice or more
if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
return false;
}
String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode());
Resource toObtainByPartition =
resourceToObtainByPartitions.get(nodePartition);
if (null != toObtainByPartition && Resources.greaterThan(rc,
clusterResource, toObtainByPartition, Resources.none()) && Resources
.fitsIn(rc, clusterResource, rmContainer.getAllocatedResource(),
totalPreemptionAllowed)) {
Resources.subtractFrom(toObtainByPartition,
rmContainer.getAllocatedResource());
Resources.subtractFrom(totalPreemptionAllowed,
rmContainer.getAllocatedResource());
// When we have no more resource need to obtain, remove from map.
if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
Resources.none())) {
resourceToObtainByPartitions.remove(nodePartition);
}
if (LOG.isDebugEnabled()) {
LOG.debug(this.getClass().getName() + " Marked container=" + rmContainer
.getContainerId() + " from partition=" + nodePartition + " queue="
+ rmContainer.getQueueName() + " to be preemption candidates");
}
// Add to preemptMap
addToPreemptMap(preemptMap, attemptId, rmContainer);
return true;
}
return false;
}
private String getPartitionByNodeId(NodeId nodeId) {
return preemptionContext.getScheduler().getSchedulerNode(nodeId)
.getPartition();
}
/** /**
* Given a target preemption for a specific application, select containers * Given a target preemption for a specific application, select containers
* to preempt (after unreserving all reservation for that app). * to preempt (after unreserving all reservation for that app).
@ -267,10 +203,6 @@ public class FifoCandidatesSelector
Map<ApplicationAttemptId, Set<RMContainer>> selectedContainers, Map<ApplicationAttemptId, Set<RMContainer>> selectedContainers,
Resource totalPreemptionAllowed) { Resource totalPreemptionAllowed) {
ApplicationAttemptId appId = app.getApplicationAttemptId(); ApplicationAttemptId appId = app.getApplicationAttemptId();
if (LOG.isDebugEnabled()) {
LOG.debug("Looking at application=" + app.getApplicationAttemptId()
+ " resourceToObtain=" + resToObtainByPartition);
}
// first drop reserved containers towards rsrcPreempt // first drop reserved containers towards rsrcPreempt
List<RMContainer> reservedContainers = List<RMContainer> reservedContainers =
@ -285,8 +217,9 @@ public class FifoCandidatesSelector
} }
// Try to preempt this container // Try to preempt this container
tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
clusterResource, selectedContainers, totalPreemptionAllowed); rc, preemptionContext, resToObtainByPartition, c, clusterResource,
selectedContainers, totalPreemptionAllowed);
if (!preemptionContext.isObserveOnly()) { if (!preemptionContext.isObserveOnly()) {
preemptionContext.getRMContext().getDispatcher().getEventHandler() preemptionContext.getRMContext().getDispatcher().getEventHandler()
@ -327,39 +260,9 @@ public class FifoCandidatesSelector
} }
// Try to preempt this container // Try to preempt this container
tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
clusterResource, selectedContainers, totalPreemptionAllowed); rc, preemptionContext, resToObtainByPartition, c, clusterResource,
selectedContainers, totalPreemptionAllowed);
} }
} }
/**
* Compare by reversed priority order first, and then reversed containerId
* order
* @param containers
*/
@VisibleForTesting
static void sortContainers(List<RMContainer> containers){
Collections.sort(containers, new Comparator<RMContainer>() {
@Override
public int compare(RMContainer a, RMContainer b) {
int schedKeyComp = b.getAllocatedSchedulerKey()
.compareTo(a.getAllocatedSchedulerKey());
if (schedKeyComp != 0) {
return schedKeyComp;
}
return b.getContainerId().compareTo(a.getContainerId());
}
});
}
private void addToPreemptMap(
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
Set<RMContainer> set;
if (null == (set = preemptMap.get(appAttemptId))) {
set = new HashSet<>();
preemptMap.put(appAttemptId, set);
}
set.add(containerToPreempt);
}
} }

View File

@ -0,0 +1,459 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* FifoIntraQueuePreemptionPlugin will handle intra-queue preemption for
* priority and user-limit.
*/
public class FifoIntraQueuePreemptionPlugin
implements
IntraQueuePreemptionComputePlugin {
protected final CapacitySchedulerPreemptionContext context;
protected final ResourceCalculator rc;
private static final Log LOG =
LogFactory.getLog(FifoIntraQueuePreemptionPlugin.class);
public FifoIntraQueuePreemptionPlugin(ResourceCalculator rc,
CapacitySchedulerPreemptionContext preemptionContext) {
this.context = preemptionContext;
this.rc = rc;
}
@Override
public Map<String, Resource> getResourceDemandFromAppsPerQueue(
String queueName, String partition) {
Map<String, Resource> resToObtainByPartition = new HashMap<>();
TempQueuePerPartition tq = context
.getQueueByPartition(queueName, partition);
Collection<TempAppPerPartition> appsOrderedByPriority = tq.getApps();
Resource actualPreemptNeeded = resToObtainByPartition.get(partition);
// Updating pending resource per-partition level.
if (actualPreemptNeeded == null) {
actualPreemptNeeded = Resources.createResource(0, 0);
resToObtainByPartition.put(partition, actualPreemptNeeded);
}
for (TempAppPerPartition a1 : appsOrderedByPriority) {
Resources.addTo(actualPreemptNeeded, a1.getActuallyToBePreempted());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Selected to preempt " + actualPreemptNeeded
+ " resource from partition:" + partition);
}
return resToObtainByPartition;
}
@Override
public void computeAppsIdealAllocation(Resource clusterResource,
Resource partitionBasedResource, TempQueuePerPartition tq,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource totalPreemptedResourceAllowed,
Resource queueReassignableResource, float maxAllowablePreemptLimit) {
// 1. AM used resource can be considered as a frozen resource for now.
// Hence such containers in a queue can be omitted from the preemption
// calculation.
Map<String, Resource> perUserAMUsed = new HashMap<String, Resource>();
Resource amUsed = calculateUsedAMResourcesPerQueue(tq.partition,
tq.leafQueue, perUserAMUsed);
Resources.subtractFrom(queueReassignableResource, amUsed);
// 2. tq.leafQueue will not be null as we validated it in caller side
Collection<FiCaSchedulerApp> apps = tq.leafQueue.getAllApplications();
// We do not need preemption for a single app
if (apps.size() == 1) {
return;
}
// 3. Create all tempApps for internal calculation and return a list from
// high priority to low priority order.
TAPriorityComparator taComparator = new TAPriorityComparator();
PriorityQueue<TempAppPerPartition> orderedByPriority =
createTempAppForResCalculation(tq.partition, apps, taComparator);
// 4. Calculate idealAssigned per app by checking based on queue's
// unallocated resource.Also return apps arranged from lower priority to
// higher priority.
TreeSet<TempAppPerPartition> orderedApps =
calculateIdealAssignedResourcePerApp(clusterResource,
partitionBasedResource, tq, selectedCandidates,
queueReassignableResource, orderedByPriority, perUserAMUsed);
// 5. A configurable limit that could define an ideal allowable preemption
// limit. Based on current queue's capacity,defined how much % could become
// preemptable.
Resource maxIntraQueuePreemptable = Resources.multiply(tq.getGuaranteed(),
maxAllowablePreemptLimit);
if (Resources.greaterThan(rc, clusterResource, maxIntraQueuePreemptable,
tq.getActuallyToBePreempted())) {
Resources.subtractFrom(maxIntraQueuePreemptable,
tq.getActuallyToBePreempted());
} else {
maxIntraQueuePreemptable = Resource.newInstance(0, 0);
}
// 6. We have two configurations here, one is intra queue limit and second
// one is per-round limit for any time preemption. Take a minimum of these
Resource preemptionLimit = Resources.min(rc, clusterResource,
maxIntraQueuePreemptable, totalPreemptedResourceAllowed);
// 7. From lowest priority app onwards, calculate toBePreempted resource
// based on demand.
calculateToBePreemptedResourcePerApp(clusterResource, orderedApps,
preemptionLimit);
// Save all apps (low to high) to temp queue for further reference
tq.addAllApps(orderedApps);
// 8. There are chances that we may preempt for the demand from same
// priority level, such cases are to be validated out.
validateOutSameAppPriorityFromDemand(clusterResource,
(TreeSet<TempAppPerPartition>) tq.getApps());
if (LOG.isDebugEnabled()) {
LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition);
for (TempAppPerPartition tmpApp : tq.getApps()) {
LOG.debug(tmpApp);
}
}
}
private void calculateToBePreemptedResourcePerApp(Resource clusterResource,
TreeSet<TempAppPerPartition> orderedApps, Resource preemptionLimit) {
for (TempAppPerPartition tmpApp : orderedApps) {
if (Resources.lessThanOrEqual(rc, clusterResource, preemptionLimit,
Resources.none())
|| Resources.lessThanOrEqual(rc, clusterResource, tmpApp.getUsed(),
Resources.none())) {
continue;
}
Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(),
tmpApp.idealAssigned);
Resources.subtractFrom(preemtableFromApp, tmpApp.selected);
Resources.subtractFrom(preemtableFromApp, tmpApp.getAMUsed());
// Calculate toBePreempted from apps as follows:
// app.preemptable = min(max(app.used - app.selected - app.ideal, 0),
// intra_q_preemptable)
tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources
.max(rc, clusterResource, preemtableFromApp, Resources.none()),
preemptionLimit);
preemptionLimit = Resources.subtract(preemptionLimit,
tmpApp.toBePreempted);
}
}
/**
* Algorithm for calculating idealAssigned is as follows:
* For each partition:
* Q.reassignable = Q.used - Q.selected;
*
* # By default set ideal assigned 0 for app.
* app.idealAssigned as 0
* # get user limit from scheduler.
* userLimitRes = Q.getUserLimit(userName)
*
* # initial all value to 0
* Map<String, Resource> userToAllocated
*
* # Loop from highest priority to lowest priority app to calculate ideal
* for app in sorted-by(priority) {
* if Q.reassignable < 0:
* break;
*
* if (user-to-allocated.get(app.user) < userLimitRes) {
* idealAssigned = min((userLimitRes - userToAllocated.get(app.user)),
* (app.used + app.pending - app.selected))
* app.idealAssigned = min(Q.reassignable, idealAssigned)
* userToAllocated.get(app.user) += app.idealAssigned;
* } else {
* // skip this app because user-limit reached
* }
* Q.reassignable -= app.idealAssigned
* }
*
* @param clusterResource Cluster Resource
* @param partitionBasedResource resource per partition
* @param tq TempQueue
* @param selectedCandidates Already Selected preemption candidates
* @param queueReassignableResource Resource used in a queue
* @param orderedByPriority List of running apps
* @param perUserAMUsed AM used resource
* @return List of temp apps ordered from low to high priority
*/
private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
Resource clusterResource, Resource partitionBasedResource,
TempQueuePerPartition tq,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource queueReassignableResource,
PriorityQueue<TempAppPerPartition> orderedByPriority,
Map<String, Resource> perUserAMUsed) {
Comparator<TempAppPerPartition> reverseComp = Collections
.reverseOrder(new TAPriorityComparator());
TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp);
Map<String, Resource> userIdealAssignedMapping = new HashMap<>();
String partition = tq.partition;
Map<String, Resource> preCalculatedUserLimit =
new HashMap<String, Resource>();
while (!orderedByPriority.isEmpty()) {
// Remove app from the next highest remaining priority and process it to
// calculate idealAssigned per app.
TempAppPerPartition tmpApp = orderedByPriority.remove();
orderedApps.add(tmpApp);
// Once unallocated resource is 0, we can stop assigning ideal per app.
if (Resources.lessThanOrEqual(rc, clusterResource,
queueReassignableResource, Resources.none())) {
continue;
}
String userName = tmpApp.app.getUser();
Resource userLimitResource = preCalculatedUserLimit.get(userName);
// Verify whether we already calculated headroom for this user.
if (userLimitResource == null) {
userLimitResource = Resources.clone(tq.leafQueue
.getUserLimitPerUser(userName, partitionBasedResource, partition));
Resource amUsed = perUserAMUsed.get(userName);
if (null == amUsed) {
amUsed = Resources.createResource(0, 0);
}
// Real AM used need not have to be considered for user-limit as well.
userLimitResource = Resources.subtract(userLimitResource, amUsed);
if (LOG.isDebugEnabled()) {
LOG.debug("Userlimit for user '" + userName + "' is :"
+ userLimitResource + ", and amUsed is:" + amUsed);
}
preCalculatedUserLimit.put(userName, userLimitResource);
}
Resource idealAssignedForUser = userIdealAssignedMapping.get(userName);
if (idealAssignedForUser == null) {
idealAssignedForUser = Resources.createResource(0, 0);
userIdealAssignedMapping.put(userName, idealAssignedForUser);
}
// Calculate total selected container resources from current app.
getAlreadySelectedPreemptionCandidatesResource(selectedCandidates,
tmpApp, partition);
// For any app, used+pending will give its idealAssigned. However it will
// be tightly linked to queue's unallocated quota. So lower priority apps
// idealAssigned may fall to 0 if higher priority apps demand is more.
Resource appIdealAssigned = Resources.add(tmpApp.getUsedDeductAM(),
tmpApp.getPending());
Resources.subtractFrom(appIdealAssigned, tmpApp.selected);
if (Resources.lessThan(rc, clusterResource, idealAssignedForUser,
userLimitResource)) {
appIdealAssigned = Resources.min(rc, clusterResource, appIdealAssigned,
Resources.subtract(userLimitResource, idealAssignedForUser));
tmpApp.idealAssigned = Resources.clone(Resources.min(rc,
clusterResource, queueReassignableResource, appIdealAssigned));
Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned);
} else {
continue;
}
// Also set how much resource is needed by this app from others.
Resource appUsedExcludedSelected = Resources
.subtract(tmpApp.getUsedDeductAM(), tmpApp.selected);
if (Resources.greaterThan(rc, clusterResource, tmpApp.idealAssigned,
appUsedExcludedSelected)) {
tmpApp.setToBePreemptFromOther(
Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected));
}
Resources.subtractFrom(queueReassignableResource, tmpApp.idealAssigned);
}
return orderedApps;
}
/*
* Previous policies would have already selected few containers from an
* application. Calculate total resource from these selected containers.
*/
private void getAlreadySelectedPreemptionCandidatesResource(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
TempAppPerPartition tmpApp, String partition) {
tmpApp.selected = Resources.createResource(0, 0);
Set<RMContainer> containers = selectedCandidates
.get(tmpApp.app.getApplicationAttemptId());
if (containers == null) {
return;
}
for (RMContainer cont : containers) {
if (partition.equals(cont.getNodeLabelExpression())) {
Resources.addTo(tmpApp.selected, cont.getAllocatedResource());
}
}
}
private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
String partition, Collection<FiCaSchedulerApp> apps,
TAPriorityComparator taComparator) {
PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>(
100, taComparator);
// have an internal temp app structure to store intermediate data(priority)
for (FiCaSchedulerApp app : apps) {
Resource used = app.getAppAttemptResourceUsage().getUsed(partition);
Resource amUsed = null;
if (!app.isWaitingForAMContainer()) {
amUsed = app.getAMResource(partition);
}
Resource pending = app.getTotalPendingRequestsPerPartition()
.get(partition);
Resource reserved = app.getAppAttemptResourceUsage()
.getReserved(partition);
used = (used == null) ? Resources.createResource(0, 0) : used;
amUsed = (amUsed == null) ? Resources.createResource(0, 0) : amUsed;
pending = (pending == null) ? Resources.createResource(0, 0) : pending;
reserved = (reserved == null) ? Resources.createResource(0, 0) : reserved;
HashSet<String> partitions = new HashSet<String>(
app.getAppAttemptResourceUsage().getNodePartitionsSet());
partitions.addAll(app.getTotalPendingRequestsPerPartition().keySet());
// Create TempAppPerQueue for further calculation.
TempAppPerPartition tmpApp = new TempAppPerPartition(app,
Resources.clone(used), Resources.clone(amUsed),
Resources.clone(reserved), Resources.clone(pending));
// Set ideal allocation of app as 0.
tmpApp.idealAssigned = Resources.createResource(0, 0);
orderedByPriority.add(tmpApp);
}
return orderedByPriority;
}
/*
* Fifo+Priority based preemption policy need not have to preempt resources at
* same priority level. Such cases will be validated out.
*/
public void validateOutSameAppPriorityFromDemand(Resource cluster,
TreeSet<TempAppPerPartition> appsOrderedfromLowerPriority) {
TempAppPerPartition[] apps = appsOrderedfromLowerPriority
.toArray(new TempAppPerPartition[appsOrderedfromLowerPriority.size()]);
if (apps.length <= 0) {
return;
}
int lPriority = 0;
int hPriority = apps.length - 1;
while (lPriority < hPriority
&& !apps[lPriority].equals(apps[hPriority])
&& apps[lPriority].getPriority() < apps[hPriority].getPriority()) {
Resource toPreemptFromOther = apps[hPriority]
.getToBePreemptFromOther();
Resource actuallyToPreempt = apps[lPriority].getActuallyToBePreempted();
Resource delta = Resources.subtract(apps[lPriority].toBePreempted,
actuallyToPreempt);
if (Resources.greaterThan(rc, cluster, delta, Resources.none())) {
Resource toPreempt = Resources.min(rc, cluster,
toPreemptFromOther, delta);
apps[hPriority].setToBePreemptFromOther(
Resources.subtract(toPreemptFromOther, toPreempt));
apps[lPriority].setActuallyToBePreempted(
Resources.add(actuallyToPreempt, toPreempt));
}
if (Resources.lessThanOrEqual(rc, cluster,
apps[lPriority].toBePreempted,
apps[lPriority].getActuallyToBePreempted())) {
lPriority++;
continue;
}
if (Resources.equals(apps[hPriority].getToBePreemptFromOther(),
Resources.none())) {
hPriority--;
continue;
}
}
}
private Resource calculateUsedAMResourcesPerQueue(String partition,
LeafQueue leafQueue, Map<String, Resource> perUserAMUsed) {
Collection<FiCaSchedulerApp> runningApps = leafQueue.getApplications();
Resource amUsed = Resources.createResource(0, 0);
for (FiCaSchedulerApp app : runningApps) {
Resource userAMResource = perUserAMUsed.get(app.getUser());
if (null == userAMResource) {
userAMResource = Resources.createResource(0, 0);
perUserAMUsed.put(app.getUser(), userAMResource);
}
Resources.addTo(userAMResource, app.getAMResource(partition));
Resources.addTo(amUsed, app.getAMResource(partition));
}
return amUsed;
}
}

View File

@ -0,0 +1,238 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Identifies over utilized resources within a queue and tries to normalize
* them to resolve resource allocation anomalies w.r.t priority and user-limit.
*/
public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
@SuppressWarnings("serial")
static class TAPriorityComparator
implements
Serializable,
Comparator<TempAppPerPartition> {
@Override
public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) {
Priority p1 = Priority.newInstance(tq1.getPriority());
Priority p2 = Priority.newInstance(tq2.getPriority());
if (!p1.equals(p2)) {
return p1.compareTo(p2);
}
return tq1.getApplicationId().compareTo(tq2.getApplicationId());
}
}
IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null;
final CapacitySchedulerPreemptionContext context;
private static final Log LOG =
LogFactory.getLog(IntraQueueCandidatesSelector.class);
IntraQueueCandidatesSelector(
CapacitySchedulerPreemptionContext preemptionContext) {
super(preemptionContext);
fifoPreemptionComputePlugin = new FifoIntraQueuePreemptionPlugin(rc,
preemptionContext);
context = preemptionContext;
}
@Override
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed) {
// 1. Calculate the abnormality within each queue one by one.
computeIntraQueuePreemptionDemand(
clusterResource, totalPreemptedResourceAllowed, selectedCandidates);
// 2. Previous selectors (with higher priority) could have already
// selected containers. We need to deduct pre-emptable resources
// based on already selected candidates.
CapacitySchedulerPreemptionUtils
.deductPreemptableResourcesBasedSelectedCandidates(preemptionContext,
selectedCandidates);
// 3. Loop through all partitions to select containers for preemption.
for (String partition : preemptionContext.getAllPartitions()) {
LinkedHashSet<String> queueNames = preemptionContext
.getUnderServedQueuesPerPartition(partition);
// Error check to handle non-mapped labels to queue.
if (null == queueNames) {
continue;
}
// 4. Iterate from most under-served queue in order.
for (String queueName : queueNames) {
LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
RMNodeLabelsManager.NO_LABEL).leafQueue;
// skip if not a leafqueue
if (null == leafQueue) {
continue;
}
// 5. Calculate the resource to obtain per partition
Map<String, Resource> resToObtainByPartition = fifoPreemptionComputePlugin
.getResourceDemandFromAppsPerQueue(queueName, partition);
// 6. Based on the selected resource demand per partition, select
// containers with known policy from inter-queue preemption.
synchronized (leafQueue) {
Iterator<FiCaSchedulerApp> desc = leafQueue.getOrderingPolicy()
.getPreemptionIterator();
while (desc.hasNext()) {
FiCaSchedulerApp app = desc.next();
preemptFromLeastStarvedApp(selectedCandidates, clusterResource,
totalPreemptedResourceAllowed, resToObtainByPartition,
leafQueue, app);
}
}
}
}
return selectedCandidates;
}
private void preemptFromLeastStarvedApp(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed,
Map<String, Resource> resToObtainByPartition, LeafQueue leafQueue,
FiCaSchedulerApp app) {
// ToDo: Reuse reservation selector here.
List<RMContainer> liveContainers = new ArrayList<>(
app.getLiveContainers());
sortContainers(liveContainers);
if (LOG.isDebugEnabled()) {
LOG.debug(
"totalPreemptedResourceAllowed for preemption at this round is :"
+ totalPreemptedResourceAllowed);
}
for (RMContainer c : liveContainers) {
// if there are no demand, return.
if (resToObtainByPartition.isEmpty()) {
return;
}
// skip preselected containers.
if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
selectedCandidates)) {
continue;
}
// Skip already marked to killable containers
if (null != preemptionContext.getKillableContainers() && preemptionContext
.getKillableContainers().contains(c.getContainerId())) {
continue;
}
// Skip AM Container from preemption for now.
if (c.isAMContainer()) {
continue;
}
// Try to preempt this container
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
rc, preemptionContext, resToObtainByPartition, c, clusterResource,
selectedCandidates, totalPreemptedResourceAllowed);
}
}
private void computeIntraQueuePreemptionDemand(Resource clusterResource,
Resource totalPreemptedResourceAllowed,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
// 1. Iterate through all partition to calculate demand within a partition.
for (String partition : context.getAllPartitions()) {
LinkedHashSet<String> queueNames = context
.getUnderServedQueuesPerPartition(partition);
if (null == queueNames) {
continue;
}
// 2. Its better to get partition based resource limit earlier before
// starting calculation
Resource partitionBasedResource =
context.getPartitionResource(partition);
// 3. loop through all queues corresponding to a partition.
for (String queueName : queueNames) {
TempQueuePerPartition tq = context.getQueueByPartition(queueName,
partition);
LeafQueue leafQueue = tq.leafQueue;
// skip if its parent queue
if (null == leafQueue) {
continue;
}
// 4. Consider reassignableResource as (used - actuallyToBePreempted).
// This provides as upper limit to split apps quota in a queue.
Resource queueReassignableResource = Resources.subtract(tq.getUsed(),
tq.getActuallyToBePreempted());
// 5. Check queue's used capacity. Make sure that the used capacity is
// above certain limit to consider for intra queue preemption.
if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context
.getMinimumThresholdForIntraQueuePreemption()) {
continue;
}
// 6. compute the allocation of all apps based on queue's unallocated
// capacity
fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource,
partitionBasedResource, tq, selectedCandidates,
totalPreemptedResourceAllowed,
queueReassignableResource,
context.getMaxAllowableLimitForIntraQueuePreemption());
}
}
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
interface IntraQueuePreemptionComputePlugin {
Map<String, Resource> getResourceDemandFromAppsPerQueue(String queueName,
String partition);
void computeAppsIdealAllocation(Resource clusterResource,
Resource partitionBasedResource, TempQueuePerPartition tq,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned,
float maxAllowablePreemptLimit);
}

View File

@ -27,61 +27,22 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.PriorityQueue;
import java.util.Set; import java.util.Set;
/** /**
* Calculate how much resources need to be preempted for each queue, * Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector} * will be used by {@link PreemptionCandidatesSelector}
*/ */
public class PreemptableResourceCalculator { public class PreemptableResourceCalculator
extends
AbstractPreemptableResourceCalculator {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(PreemptableResourceCalculator.class); LogFactory.getLog(PreemptableResourceCalculator.class);
private final CapacitySchedulerPreemptionContext context;
private final ResourceCalculator rc;
private boolean isReservedPreemptionCandidatesSelector; private boolean isReservedPreemptionCandidatesSelector;
static class TQComparator implements Comparator<TempQueuePerPartition> {
private ResourceCalculator rc;
private Resource clusterRes;
TQComparator(ResourceCalculator rc, Resource clusterRes) {
this.rc = rc;
this.clusterRes = clusterRes;
}
@Override
public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
return -1;
}
if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
return 1;
}
return 0;
}
// Calculates idealAssigned / guaranteed
// TempQueues with 0 guarantees are always considered the most over
// capacity and therefore considered last for resources.
private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
double pctOver = Integer.MAX_VALUE;
if (q != null && Resources.greaterThan(rc, clusterRes,
q.getGuaranteed(),
Resources.none())) {
pctOver = Resources.divide(rc, clusterRes, q.idealAssigned,
q.getGuaranteed());
}
return (pctOver);
}
}
/** /**
* PreemptableResourceCalculator constructor * PreemptableResourceCalculator constructor
* *
@ -93,136 +54,7 @@ public class PreemptableResourceCalculator {
public PreemptableResourceCalculator( public PreemptableResourceCalculator(
CapacitySchedulerPreemptionContext preemptionContext, CapacitySchedulerPreemptionContext preemptionContext,
boolean isReservedPreemptionCandidatesSelector) { boolean isReservedPreemptionCandidatesSelector) {
context = preemptionContext; super(preemptionContext, isReservedPreemptionCandidatesSelector);
rc = preemptionContext.getResourceCalculator();
this.isReservedPreemptionCandidatesSelector =
isReservedPreemptionCandidatesSelector;
}
/**
* Computes a normalizedGuaranteed capacity based on active queues
* @param rc resource calculator
* @param clusterResource the total amount of resources in the cluster
* @param queues the list of queues to consider
*/
private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
Resource activeCap = Resource.newInstance(0, 0);
if (ignoreGuar) {
for (TempQueuePerPartition q : queues) {
q.normalizedGuarantee = 1.0f / queues.size();
}
} else {
for (TempQueuePerPartition q : queues) {
Resources.addTo(activeCap, q.getGuaranteed());
}
for (TempQueuePerPartition q : queues) {
q.normalizedGuarantee = Resources.divide(rc, clusterResource,
q.getGuaranteed(), activeCap);
}
}
}
// Take the most underserved TempQueue (the one on the head). Collect and
// return the list of all queues that have the same idealAssigned
// percentage of guaranteed.
protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
PriorityQueue<TempQueuePerPartition> orderedByNeed,
TQComparator tqComparator) {
ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
while (!orderedByNeed.isEmpty()) {
TempQueuePerPartition q1 = orderedByNeed.remove();
underserved.add(q1);
TempQueuePerPartition q2 = orderedByNeed.peek();
// q1's pct of guaranteed won't be larger than q2's. If it's less, then
// return what has already been collected. Otherwise, q1's pct of
// guaranteed == that of q2, so add q2 to underserved list during the
// next pass.
if (q2 == null || tqComparator.compare(q1,q2) < 0) {
return underserved;
}
}
return underserved;
}
/**
* Given a set of queues compute the fix-point distribution of unassigned
* resources among them. As pending request of a queue are exhausted, the
* queue is removed from the set and remaining capacity redistributed among
* remaining queues. The distribution is weighted based on guaranteed
* capacity, unless asked to ignoreGuarantee, in which case resources are
* distributed uniformly.
*/
private void computeFixpointAllocation(ResourceCalculator rc,
Resource tot_guarant, Collection<TempQueuePerPartition> qAlloc,
Resource unassigned, boolean ignoreGuarantee) {
// Prior to assigning the unused resources, process each queue as follows:
// If current > guaranteed, idealAssigned = guaranteed + untouchable extra
// Else idealAssigned = current;
// Subtract idealAssigned resources from unassigned.
// If the queue has all of its needs met (that is, if
// idealAssigned >= current + pending), remove it from consideration.
// Sort queues from most under-guaranteed to most over-guaranteed.
TQComparator tqComparator = new TQComparator(rc, tot_guarant);
PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
tqComparator);
for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
TempQueuePerPartition q = i.next();
Resource used = q.getUsed();
if (Resources.greaterThan(rc, tot_guarant, used,
q.getGuaranteed())) {
q.idealAssigned = Resources.add(
q.getGuaranteed(), q.untouchableExtra);
} else {
q.idealAssigned = Resources.clone(used);
}
Resources.subtractFrom(unassigned, q.idealAssigned);
// If idealAssigned < (allocated + used + pending), q needs more resources, so
// add it to the list of underserved queues, ordered by need.
Resource curPlusPend = Resources.add(q.getUsed(), q.pending);
if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) {
orderedByNeed.add(q);
}
}
//assign all cluster resources until no more demand, or no resources are left
while (!orderedByNeed.isEmpty()
&& Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) {
Resource wQassigned = Resource.newInstance(0, 0);
// we compute normalizedGuarantees capacity based on currently active
// queues
resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee);
// For each underserved queue (or set of queues if multiple are equally
// underserved), offer its share of the unassigned resources based on its
// normalized guarantee. After the offer, if the queue is not satisfied,
// place it back in the ordered list of queues, recalculating its place
// in the order of most under-guaranteed to most over-guaranteed. In this
// way, the most underserved queue(s) are always given resources first.
Collection<TempQueuePerPartition> underserved =
getMostUnderservedQueues(orderedByNeed, tqComparator);
for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
.hasNext();) {
TempQueuePerPartition sub = i.next();
Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
Resource wQidle = sub.offer(wQavail, rc, tot_guarant,
isReservedPreemptionCandidatesSelector);
Resource wQdone = Resources.subtract(wQavail, wQidle);
if (Resources.greaterThan(rc, tot_guarant,
wQdone, Resources.none())) {
// The queue is still asking for more. Put it back in the priority
// queue, recalculating its order based on need.
orderedByNeed.add(sub);
}
Resources.addTo(wQassigned, wQdone);
}
Resources.subtractFrom(unassigned, wQassigned);
}
} }
/** /**
@ -263,14 +95,14 @@ public class PreemptableResourceCalculator {
} }
// first compute the allocation as a fixpoint based on guaranteed capacity // first compute the allocation as a fixpoint based on guaranteed capacity
computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned, computeFixpointAllocation(tot_guarant, nonZeroGuarQueues, unassigned,
false); false);
// if any capacity is left unassigned, distributed among zero-guarantee // if any capacity is left unassigned, distributed among zero-guarantee
// queues uniformly (i.e., not based on guaranteed capacity, as this is zero) // queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
if (!zeroGuarQueues.isEmpty() if (!zeroGuarQueues.isEmpty()
&& Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) { && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned, computeFixpointAllocation(tot_guarant, zeroGuarQueues, unassigned,
true); true);
} }
@ -321,13 +153,12 @@ public class PreemptableResourceCalculator {
computeIdealResourceDistribution(rc, root.getChildren(), computeIdealResourceDistribution(rc, root.getChildren(),
totalPreemptionAllowed, root.idealAssigned); totalPreemptionAllowed, root.idealAssigned);
// compute recursively for lower levels and build list of leafs // compute recursively for lower levels and build list of leafs
for(TempQueuePerPartition t : root.getChildren()) { for (TempQueuePerPartition t : root.getChildren()) {
recursivelyComputeIdealAssignment(t, totalPreemptionAllowed); recursivelyComputeIdealAssignment(t, totalPreemptionAllowed);
} }
} }
} }
private void calculateResToObtainByPartitionForLeafQueues( private void calculateResToObtainByPartitionForLeafQueues(
Set<String> leafQueueNames, Resource clusterResource) { Set<String> leafQueueNames, Resource clusterResource) {
// Loop all leaf queues // Loop all leaf queues

View File

@ -23,6 +23,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -41,7 +46,7 @@ public abstract class PreemptionCandidatesSelector {
* selected candidates. * selected candidates.
* *
* @param selectedCandidates already selected candidates from previous policies * @param selectedCandidates already selected candidates from previous policies
* @param clusterResource * @param clusterResource total resource
* @param totalPreemptedResourceAllowed how many resources allowed to be * @param totalPreemptedResourceAllowed how many resources allowed to be
* preempted in this round * preempted in this round
* @return merged selected candidates. * @return merged selected candidates.
@ -49,4 +54,26 @@ public abstract class PreemptionCandidatesSelector {
public abstract Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates( public abstract Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed); Resource clusterResource, Resource totalPreemptedResourceAllowed);
/**
* Compare by reversed priority order first, and then reversed containerId
* order.
*
* @param containers list of containers to sort for.
*/
@VisibleForTesting
static void sortContainers(List<RMContainer> containers) {
Collections.sort(containers, new Comparator<RMContainer>() {
@Override
public int compare(RMContainer a, RMContainer b) {
int schedKeyComp = b.getAllocatedSchedulerKey()
.compareTo(a.getAllocatedSchedulerKey());
if (schedKeyComp != 0) {
return schedKeyComp;
}
return b.getContainerId().compareTo(a.getContainerId());
}
});
}
} }

View File

@ -52,6 +52,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -91,6 +92,9 @@ public class ProportionalCapacityPreemptionPolicy
private boolean observeOnly; private boolean observeOnly;
private boolean lazyPreempionEnabled; private boolean lazyPreempionEnabled;
private float maxAllowableLimitForIntraQueuePreemption;
private float minimumThresholdForIntraQueuePreemption;
// Pointer to other RM components // Pointer to other RM components
private RMContext rmContext; private RMContext rmContext;
private ResourceCalculator rc; private ResourceCalculator rc;
@ -102,6 +106,8 @@ public class ProportionalCapacityPreemptionPolicy
new HashMap<>(); new HashMap<>();
private Map<String, Map<String, TempQueuePerPartition>> queueToPartitions = private Map<String, Map<String, TempQueuePerPartition>> queueToPartitions =
new HashMap<>(); new HashMap<>();
private Map<String, LinkedHashSet<String>> partitionToUnderServedQueues =
new HashMap<String, LinkedHashSet<String>>();
private List<PreemptionCandidatesSelector> private List<PreemptionCandidatesSelector>
candidatesSelectionPolicies = new ArrayList<>(); candidatesSelectionPolicies = new ArrayList<>();
private Set<String> allPartitions; private Set<String> allPartitions;
@ -171,23 +177,44 @@ public class ProportionalCapacityPreemptionPolicy
CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED); CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED);
maxAllowableLimitForIntraQueuePreemption = csConfig.getFloat(
CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
CapacitySchedulerConfiguration.
DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT);
minimumThresholdForIntraQueuePreemption = csConfig.getFloat(
CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD,
CapacitySchedulerConfiguration.
DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD);
rc = scheduler.getResourceCalculator(); rc = scheduler.getResourceCalculator();
nlm = scheduler.getRMContext().getNodeLabelManager(); nlm = scheduler.getRMContext().getNodeLabelManager();
// Do we need to specially consider reserved containers? // Do we need to specially consider reserved containers?
boolean selectCandidatesForResevedContainers = csConfig.getBoolean( boolean selectCandidatesForResevedContainers = csConfig.getBoolean(
CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, CapacitySchedulerConfiguration.
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS); PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
CapacitySchedulerConfiguration.
DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS);
if (selectCandidatesForResevedContainers) { if (selectCandidatesForResevedContainers) {
candidatesSelectionPolicies.add( candidatesSelectionPolicies
new ReservedContainerCandidatesSelector(this)); .add(new ReservedContainerCandidatesSelector(this));
} }
// initialize candidates preemption selection policies // initialize candidates preemption selection policies
candidatesSelectionPolicies.add( candidatesSelectionPolicies.add(new FifoCandidatesSelector(this));
new FifoCandidatesSelector(this));
// Do we need to specially consider intra queue
boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
if (isIntraQueuePreemptionEnabled) {
candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this));
}
} }
@Override @Override
public ResourceCalculator getResourceCalculator() { public ResourceCalculator getResourceCalculator() {
return rc; return rc;
@ -210,6 +237,12 @@ public class ProportionalCapacityPreemptionPolicy
private void preemptOrkillSelectedContainerAfterWait( private void preemptOrkillSelectedContainerAfterWait(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
long currentTime) { long currentTime) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Starting to preempt containers for selectedCandidates and size:"
+ selectedCandidates.size());
}
// preempt (or kill) the selected containers // preempt (or kill) the selected containers
for (Map.Entry<ApplicationAttemptId, Set<RMContainer>> e : selectedCandidates for (Map.Entry<ApplicationAttemptId, Set<RMContainer>> e : selectedCandidates
.entrySet()) { .entrySet()) {
@ -234,6 +267,7 @@ public class ProportionalCapacityPreemptionPolicy
// not have to raise another event. // not have to raise another event.
continue; continue;
} }
//otherwise just send preemption events //otherwise just send preemption events
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(appAttemptId, container, new ContainerPreemptEvent(appAttemptId, container,
@ -294,7 +328,6 @@ public class ProportionalCapacityPreemptionPolicy
* @param root the root of the CapacityScheduler queue hierarchy * @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster * @param clusterResources the total amount of resources in the cluster
*/ */
@SuppressWarnings("unchecked")
private void containerBasedPreemptOrKill(CSQueue root, private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) { Resource clusterResources) {
// Sync killable containers from scheduler when lazy preemption enabled // Sync killable containers from scheduler when lazy preemption enabled
@ -542,4 +575,41 @@ public class ProportionalCapacityPreemptionPolicy
Map<String, Map<String, TempQueuePerPartition>> getQueuePartitions() { Map<String, Map<String, TempQueuePerPartition>> getQueuePartitions() {
return queueToPartitions; return queueToPartitions;
} }
@Override
public int getClusterMaxApplicationPriority() {
return scheduler.getMaxClusterLevelAppPriority().getPriority();
}
@Override
public float getMaxAllowableLimitForIntraQueuePreemption() {
return maxAllowableLimitForIntraQueuePreemption;
}
@Override
public float getMinimumThresholdForIntraQueuePreemption() {
return minimumThresholdForIntraQueuePreemption;
}
@Override
public Resource getPartitionResource(String partition) {
return Resources.clone(nlm.getResourceByLabel(partition,
Resources.clone(scheduler.getClusterResource())));
}
public LinkedHashSet<String> getUnderServedQueuesPerPartition(
String partition) {
return partitionToUnderServedQueues.get(partition);
}
public void addPartitionToUnderServedQueues(String queueName,
String partition) {
LinkedHashSet<String> underServedQueues = partitionToUnderServedQueues
.get(partition);
if (null == underServedQueues) {
underServedQueues = new LinkedHashSet<String>();
partitionToUnderServedQueues.put(partition, underServedQueues);
}
underServedQueues.add(queueName);
}
} }

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Temporary data-structure tracking resource availability, pending resource
* need, current utilization for an application.
*/
public class TempAppPerPartition extends AbstractPreemptionEntity {
// Following fields are settled and used by candidate selection policies
private final int priority;
private final ApplicationId applicationId;
FiCaSchedulerApp app;
TempAppPerPartition(FiCaSchedulerApp app, Resource usedPerPartition,
Resource amUsedPerPartition, Resource reserved,
Resource pendingPerPartition) {
super(app.getQueueName(), usedPerPartition, amUsedPerPartition, reserved,
pendingPerPartition);
this.priority = app.getPriority().getPriority();
this.applicationId = app.getApplicationId();
this.app = app;
}
public FiCaSchedulerApp getFiCaSchedulerApp() {
return app;
}
public void assignPreemption(Resource killable) {
Resources.addTo(toBePreempted, killable);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(" NAME: " + getApplicationId()).append(" PRIO: ").append(priority)
.append(" CUR: ").append(getUsed()).append(" PEN: ").append(pending)
.append(" RESERVED: ").append(reserved).append(" IDEAL_ASSIGNED: ")
.append(idealAssigned).append(" PREEMPT_OTHER: ")
.append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ")
.append(toBePreempted).append(" ACTUAL_PREEMPT: ")
.append(getActuallyToBePreempted()).append("\n");
return sb.toString();
}
void appendLogString(StringBuilder sb) {
sb.append(queueName).append(", ").append(getUsed().getMemorySize())
.append(", ").append(getUsed().getVirtualCores()).append(", ")
.append(pending.getMemorySize()).append(", ")
.append(pending.getVirtualCores()).append(", ")
.append(idealAssigned.getMemorySize()).append(", ")
.append(idealAssigned.getVirtualCores()).append(", ")
.append(toBePreempted.getMemorySize()).append(", ")
.append(toBePreempted.getVirtualCores()).append(", ")
.append(getActuallyToBePreempted().getMemorySize()).append(", ")
.append(getActuallyToBePreempted().getVirtualCores());
}
public int getPriority() {
return priority;
}
public ApplicationId getApplicationId() {
return applicationId;
}
public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator,
Resource cluster, Resource toBeDeduct, String partition) {
if (Resources.greaterThan(resourceCalculator, cluster,
getActuallyToBePreempted(), toBeDeduct)) {
Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
}
}
}

View File

@ -25,34 +25,29 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
/** /**
* Temporary data-structure tracking resource availability, pending resource * Temporary data-structure tracking resource availability, pending resource
* need, current utilization. This is per-queue-per-partition data structure * need, current utilization. This is per-queue-per-partition data structure
*/ */
public class TempQueuePerPartition { public class TempQueuePerPartition extends AbstractPreemptionEntity {
// Following fields are copied from scheduler // Following fields are copied from scheduler
final String queueName;
final String partition; final String partition;
final Resource pending;
private final Resource current;
private final Resource killable; private final Resource killable;
private final Resource reserved;
private final float absCapacity; private final float absCapacity;
private final float absMaxCapacity; private final float absMaxCapacity;
final Resource totalPartitionResource; final Resource totalPartitionResource;
// Following fields are setted and used by candidate selection policies // Following fields are settled and used by candidate selection policies
Resource idealAssigned;
Resource toBePreempted;
Resource untouchableExtra; Resource untouchableExtra;
Resource preemptableExtra; Resource preemptableExtra;
private Resource actuallyToBePreempted;
double normalizedGuarantee; double normalizedGuarantee;
final ArrayList<TempQueuePerPartition> children; final ArrayList<TempQueuePerPartition> children;
private Collection<TempAppPerPartition> apps;
LeafQueue leafQueue; LeafQueue leafQueue;
boolean preemptionDisabled; boolean preemptionDisabled;
@ -60,8 +55,8 @@ public class TempQueuePerPartition {
boolean preemptionDisabled, String partition, Resource killable, boolean preemptionDisabled, String partition, Resource killable,
float absCapacity, float absMaxCapacity, Resource totalPartitionResource, float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
Resource reserved, CSQueue queue) { Resource reserved, CSQueue queue) {
this.queueName = queueName; super(queueName, current, Resource.newInstance(0, 0), reserved,
this.current = current; Resource.newInstance(0, 0));
if (queue instanceof LeafQueue) { if (queue instanceof LeafQueue) {
LeafQueue l = (LeafQueue) queue; LeafQueue l = (LeafQueue) queue;
@ -72,11 +67,9 @@ public class TempQueuePerPartition {
pending = Resources.createResource(0); pending = Resources.createResource(0);
} }
this.idealAssigned = Resource.newInstance(0, 0);
this.actuallyToBePreempted = Resource.newInstance(0, 0);
this.toBePreempted = Resource.newInstance(0, 0);
this.normalizedGuarantee = Float.NaN; this.normalizedGuarantee = Float.NaN;
this.children = new ArrayList<>(); this.children = new ArrayList<>();
this.apps = new ArrayList<>();
this.untouchableExtra = Resource.newInstance(0, 0); this.untouchableExtra = Resource.newInstance(0, 0);
this.preemptableExtra = Resource.newInstance(0, 0); this.preemptableExtra = Resource.newInstance(0, 0);
this.preemptionDisabled = preemptionDisabled; this.preemptionDisabled = preemptionDisabled;
@ -85,7 +78,6 @@ public class TempQueuePerPartition {
this.absCapacity = absCapacity; this.absCapacity = absCapacity;
this.absMaxCapacity = absMaxCapacity; this.absMaxCapacity = absMaxCapacity;
this.totalPartitionResource = totalPartitionResource; this.totalPartitionResource = totalPartitionResource;
this.reserved = reserved;
} }
public void setLeafQueue(LeafQueue l) { public void setLeafQueue(LeafQueue l) {
@ -95,7 +87,9 @@ public class TempQueuePerPartition {
/** /**
* When adding a child we also aggregate its pending resource needs. * When adding a child we also aggregate its pending resource needs.
* @param q the child queue to add to this queue *
* @param q
* the child queue to add to this queue
*/ */
public void addChild(TempQueuePerPartition q) { public void addChild(TempQueuePerPartition q) {
assert leafQueue == null; assert leafQueue == null;
@ -103,14 +97,10 @@ public class TempQueuePerPartition {
Resources.addTo(pending, q.pending); Resources.addTo(pending, q.pending);
} }
public ArrayList<TempQueuePerPartition> getChildren(){ public ArrayList<TempQueuePerPartition> getChildren() {
return children; return children;
} }
public Resource getUsed() {
return current;
}
public Resource getUsedDeductReservd() { public Resource getUsedDeductReservd() {
return Resources.subtract(current, reserved); return Resources.subtract(current, reserved);
} }
@ -122,28 +112,30 @@ public class TempQueuePerPartition {
Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
Resources.subtract(getMax(), idealAssigned), Resources.subtract(getMax(), idealAssigned),
Resource.newInstance(0, 0)); Resource.newInstance(0, 0));
// remain = avail - min(avail, (max - assigned), (current + pending - assigned)) // remain = avail - min(avail, (max - assigned), (current + pending -
// assigned))
Resource accepted = Resources.min(rc, clusterResource, Resource accepted = Resources.min(rc, clusterResource,
absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail, absMaxCapIdealAssignedDelta,
Resources Resources.min(rc, clusterResource, avail, Resources
/* /*
* When we're using FifoPreemptionSelector * When we're using FifoPreemptionSelector (considerReservedResource
* (considerReservedResource = false). * = false).
* *
* We should deduct reserved resource to avoid excessive preemption: * We should deduct reserved resource to avoid excessive preemption:
* *
* For example, if an under-utilized queue has used = reserved = 20. * For example, if an under-utilized queue has used = reserved = 20.
* Preemption policy will try to preempt 20 containers * Preemption policy will try to preempt 20 containers (which is not
* (which is not satisfied) from different hosts. * satisfied) from different hosts.
* *
* In FifoPreemptionSelector, there's no guarantee that preempted * In FifoPreemptionSelector, there's no guarantee that preempted
* resource can be used by pending request, so policy will preempt * resource can be used by pending request, so policy will preempt
* resources repeatly. * resources repeatly.
*/ */
.subtract(Resources.add( .subtract(
(considersReservedResource ? getUsed() : Resources.add((considersReservedResource
getUsedDeductReservd()), ? getUsed()
pending), idealAssigned))); : getUsedDeductReservd()), pending),
idealAssigned)));
Resource remain = Resources.subtract(avail, accepted); Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted); Resources.addTo(idealAssigned, accepted);
return remain; return remain;
@ -162,8 +154,7 @@ public class TempQueuePerPartition {
untouchableExtra = Resources.none(); untouchableExtra = Resources.none();
preemptableExtra = Resources.none(); preemptableExtra = Resources.none();
Resource extra = Resources.subtract(getUsed(), Resource extra = Resources.subtract(getUsed(), getGuaranteed());
getGuaranteed());
if (Resources.lessThan(rc, totalPartitionResource, extra, if (Resources.lessThan(rc, totalPartitionResource, extra,
Resources.none())) { Resources.none())) {
extra = Resources.none(); extra = Resources.none();
@ -197,26 +188,21 @@ public class TempQueuePerPartition {
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append(" NAME: " + queueName) sb.append(" NAME: " + queueName).append(" CUR: ").append(current)
.append(" CUR: ").append(current) .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved)
.append(" PEN: ").append(pending) .append(" GAR: ").append(getGuaranteed()).append(" NORM: ")
.append(" RESERVED: ").append(reserved) .append(normalizedGuarantee).append(" IDEAL_ASSIGNED: ")
.append(" GAR: ").append(getGuaranteed()) .append(idealAssigned).append(" IDEAL_PREEMPT: ").append(toBePreempted)
.append(" NORM: ").append(normalizedGuarantee) .append(" ACTUAL_PREEMPT: ").append(getActuallyToBePreempted())
.append(" IDEAL_ASSIGNED: ").append(idealAssigned)
.append(" IDEAL_PREEMPT: ").append(toBePreempted)
.append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted)
.append(" UNTOUCHABLE: ").append(untouchableExtra) .append(" UNTOUCHABLE: ").append(untouchableExtra)
.append(" PREEMPTABLE: ").append(preemptableExtra) .append(" PREEMPTABLE: ").append(preemptableExtra).append("\n");
.append("\n");
return sb.toString(); return sb.toString();
} }
public void assignPreemption(float scalingFactor, ResourceCalculator rc, public void assignPreemption(float scalingFactor, ResourceCalculator rc,
Resource clusterResource) { Resource clusterResource) {
Resource usedDeductKillable = Resources.subtract( Resource usedDeductKillable = Resources.subtract(getUsed(), killable);
getUsed(), killable);
Resource totalResource = Resources.add(getUsed(), pending); Resource totalResource = Resources.add(getUsed(), pending);
// The minimum resource that we need to keep for a queue is: // The minimum resource that we need to keep for a queue is:
@ -224,7 +210,8 @@ public class TempQueuePerPartition {
// //
// Doing this because when we calculate ideal allocation doesn't consider // Doing this because when we calculate ideal allocation doesn't consider
// reserved resource, ideal-allocation calculated could be less than // reserved resource, ideal-allocation calculated could be less than
// guaranteed and total. We should avoid preempt from a queue if it is already // guaranteed and total. We should avoid preempt from a queue if it is
// already
// <= its guaranteed resource. // <= its guaranteed resource.
Resource minimumQueueResource = Resources.max(rc, clusterResource, Resource minimumQueueResource = Resources.max(rc, clusterResource,
Resources.min(rc, clusterResource, totalResource, getGuaranteed()), Resources.min(rc, clusterResource, totalResource, getGuaranteed()),
@ -233,33 +220,26 @@ public class TempQueuePerPartition {
if (Resources.greaterThan(rc, clusterResource, usedDeductKillable, if (Resources.greaterThan(rc, clusterResource, usedDeductKillable,
minimumQueueResource)) { minimumQueueResource)) {
toBePreempted = Resources.multiply( toBePreempted = Resources.multiply(
Resources.subtract(usedDeductKillable, minimumQueueResource), scalingFactor); Resources.subtract(usedDeductKillable, minimumQueueResource),
scalingFactor);
} else { } else {
toBePreempted = Resources.none(); toBePreempted = Resources.none();
} }
} }
public Resource getActuallyToBePreempted() {
return actuallyToBePreempted;
}
public void setActuallyToBePreempted(Resource res) {
this.actuallyToBePreempted = res;
}
public void deductActuallyToBePreempted(ResourceCalculator rc, public void deductActuallyToBePreempted(ResourceCalculator rc,
Resource cluster, Resource toBeDeduct) { Resource cluster, Resource toBeDeduct) {
if (Resources.greaterThan(rc, cluster, actuallyToBePreempted, toBeDeduct)) { if (Resources.greaterThan(rc, cluster, getActuallyToBePreempted(),
Resources.subtractFrom(actuallyToBePreempted, toBeDeduct); toBeDeduct)) {
Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
} }
actuallyToBePreempted = Resources.max(rc, cluster, actuallyToBePreempted, setActuallyToBePreempted(Resources.max(rc, cluster,
Resources.none()); getActuallyToBePreempted(), Resources.none()));
} }
void appendLogString(StringBuilder sb) { void appendLogString(StringBuilder sb) {
sb.append(queueName).append(", ") sb.append(queueName).append(", ").append(current.getMemorySize())
.append(current.getMemorySize()).append(", ") .append(", ").append(current.getVirtualCores()).append(", ")
.append(current.getVirtualCores()).append(", ")
.append(pending.getMemorySize()).append(", ") .append(pending.getMemorySize()).append(", ")
.append(pending.getVirtualCores()).append(", ") .append(pending.getVirtualCores()).append(", ")
.append(getGuaranteed().getMemorySize()).append(", ") .append(getGuaranteed().getMemorySize()).append(", ")
@ -267,9 +247,17 @@ public class TempQueuePerPartition {
.append(idealAssigned.getMemorySize()).append(", ") .append(idealAssigned.getMemorySize()).append(", ")
.append(idealAssigned.getVirtualCores()).append(", ") .append(idealAssigned.getVirtualCores()).append(", ")
.append(toBePreempted.getMemorySize()).append(", ") .append(toBePreempted.getMemorySize()).append(", ")
.append(toBePreempted.getVirtualCores() ).append(", ") .append(toBePreempted.getVirtualCores()).append(", ")
.append(actuallyToBePreempted.getMemorySize()).append(", ") .append(getActuallyToBePreempted().getMemorySize()).append(", ")
.append(actuallyToBePreempted.getVirtualCores()); .append(getActuallyToBePreempted().getVirtualCores());
}
public void addAllApps(Collection<TempAppPerPartition> orderedApps) {
this.apps = orderedApps;
}
public Collection<TempAppPerPartition> getApps() {
return apps;
} }
} }

View File

@ -1045,6 +1045,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
private static final String PREEMPTION_CONFIG_PREFIX = private static final String PREEMPTION_CONFIG_PREFIX =
"yarn.resourcemanager.monitor.capacity.preemption."; "yarn.resourcemanager.monitor.capacity.preemption.";
private static final String INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX =
"intra-queue-preemption.";
/** If true, run the policy but do not affect the cluster with preemption and /** If true, run the policy but do not affect the cluster with preemption and
* kill events. */ * kill events. */
public static final String PREEMPTION_OBSERVE_ONLY = public static final String PREEMPTION_OBSERVE_ONLY =
@ -1098,4 +1101,32 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers"; PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers";
public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS = public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS =
false; false;
/**
* For intra-queue preemption, priority/user-limit/fairness based selectors
* can help to preempt containers.
*/
public static final String INTRAQUEUE_PREEMPTION_ENABLED =
PREEMPTION_CONFIG_PREFIX +
INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "enabled";
public static final boolean DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED = false;
/**
* For intra-queue preemption, consider those queues which are above used cap
* limit.
*/
public static final String INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD =
PREEMPTION_CONFIG_PREFIX +
INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "minimum-threshold";
public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD =
0.5f;
/**
* For intra-queue preemption, allowable maximum-preemptable limit per queue.
*/
public static final String INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
PREEMPTION_CONFIG_PREFIX +
INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "max-allowable-limit";
public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
0.2f;
} }

View File

@ -78,6 +78,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -527,6 +528,7 @@ public class LeafQueue extends AbstractCSQueue {
// queue metrics are updated, more resource may be available // queue metrics are updated, more resource may be available
// activate the pending applications if possible // activate the pending applications if possible
activateApplications(); activateApplications();
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@ -1156,8 +1158,9 @@ public class LeafQueue extends AbstractCSQueue {
Resource clusterResource, FiCaSchedulerApp application, Resource clusterResource, FiCaSchedulerApp application,
String partition) { String partition) {
return getHeadroom(user, queueCurrentLimit, clusterResource, return getHeadroom(user, queueCurrentLimit, clusterResource,
computeUserLimit(application, clusterResource, user, partition, computeUserLimit(application.getUser(), clusterResource, user,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition); partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
partition);
} }
private Resource getHeadroom(User user, private Resource getHeadroom(User user,
@ -1229,7 +1232,7 @@ public class LeafQueue extends AbstractCSQueue {
// Compute user limit respect requested labels, // Compute user limit respect requested labels,
// TODO, need consider headroom respect labels also // TODO, need consider headroom respect labels also
Resource userLimit = Resource userLimit =
computeUserLimit(application, clusterResource, queueUser, computeUserLimit(application.getUser(), clusterResource, queueUser,
nodePartition, schedulingMode); nodePartition, schedulingMode);
setQueueResourceLimitsInfo(clusterResource); setQueueResourceLimitsInfo(clusterResource);
@ -1267,7 +1270,7 @@ public class LeafQueue extends AbstractCSQueue {
} }
@Lock(NoLock.class) @Lock(NoLock.class)
private Resource computeUserLimit(FiCaSchedulerApp application, private Resource computeUserLimit(String userName,
Resource clusterResource, User user, Resource clusterResource, User user,
String nodePartition, SchedulingMode schedulingMode) { String nodePartition, SchedulingMode schedulingMode) {
Resource partitionResource = labelManager.getResourceByLabel(nodePartition, Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
@ -1367,7 +1370,6 @@ public class LeafQueue extends AbstractCSQueue {
minimumAllocation); minimumAllocation);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
String userName = application.getUser();
LOG.debug("User limit computation for " + userName + LOG.debug("User limit computation for " + userName +
" in queue " + getQueueName() + " in queue " + getQueueName() +
" userLimitPercent=" + userLimit + " userLimitPercent=" + userLimit +
@ -2018,6 +2020,17 @@ public class LeafQueue extends AbstractCSQueue {
.getSchedulableEntities()); .getSchedulableEntities());
} }
/**
* Obtain (read-only) collection of all applications.
*/
public Collection<FiCaSchedulerApp> getAllApplications() {
Collection<FiCaSchedulerApp> apps = new HashSet<FiCaSchedulerApp>(
pendingOrderingPolicy.getSchedulableEntities());
apps.addAll(orderingPolicy.getSchedulableEntities());
return Collections.unmodifiableCollection(apps);
}
// Consider the headroom for each user in the queue. // Consider the headroom for each user in the queue.
// Total pending for the queue = // Total pending for the queue =
// sum(for each user(min((user's headroom), sum(user's pending requests)))) // sum(for each user(min((user's headroom), sum(user's pending requests))))
@ -2034,7 +2047,7 @@ public class LeafQueue extends AbstractCSQueue {
if (!userNameToHeadroom.containsKey(userName)) { if (!userNameToHeadroom.containsKey(userName)) {
User user = getUser(userName); User user = getUser(userName);
Resource headroom = Resources.subtract( Resource headroom = Resources.subtract(
computeUserLimit(app, resources, user, partition, computeUserLimit(app.getUser(), resources, user, partition,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
user.getUsed(partition)); user.getUsed(partition));
// Make sure headroom is not negative. // Make sure headroom is not negative.
@ -2056,6 +2069,16 @@ public class LeafQueue extends AbstractCSQueue {
} }
public synchronized Resource getUserLimitPerUser(String userName,
Resource resources, String partition) {
// Check user resource limit
User user = getUser(userName);
return computeUserLimit(userName, resources, user, partition,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
@Override @Override
public void collectSchedulerApplications( public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) { Collection<ApplicationAttemptId> apps) {
@ -2111,8 +2134,8 @@ public class LeafQueue extends AbstractCSQueue {
} }
/** /**
* return all ignored partition exclusivity RMContainers in the LeafQueue, this * @return all ignored partition exclusivity RMContainers in the LeafQueue,
* will be used by preemption policy. * this will be used by preemption policy.
*/ */
public Map<String, TreeSet<RMContainer>> public Map<String, TreeSet<RMContainer>>
getIgnoreExclusivityRMContainers() { getIgnoreExclusivityRMContainers() {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -311,6 +312,23 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return false; return false;
} }
public synchronized Map<String, Resource> getTotalPendingRequestsPerPartition() {
Map<String, Resource> ret = new HashMap<String, Resource>();
Resource res = null;
for (SchedulerRequestKey key : appSchedulingInfo.getSchedulerKeys()) {
ResourceRequest rr = appSchedulingInfo.getResourceRequest(key, "*");
if ((res = ret.get(rr.getNodeLabelExpression())) == null) {
res = Resources.createResource(0, 0);
ret.put(rr.getNodeLabelExpression(), res);
}
Resources.addTo(res,
Resources.multiply(rr.getCapability(), rr.getNumContainers()));
}
return ret;
}
public void markContainerForPreemption(ContainerId cont) { public void markContainerForPreemption(ContainerId cont) {
try { try {
writeLock.lock(); writeLock.lock();

View File

@ -65,11 +65,14 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
@ -164,13 +167,17 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
mClock); mClock);
} }
private void mockContainers(String containersConfig, ApplicationAttemptId attemptId, private void mockContainers(String containersConfig, FiCaSchedulerApp app,
String queueName, List<RMContainer> reservedContainers, ApplicationAttemptId attemptId, String queueName,
List<RMContainer> liveContainers) { List<RMContainer> reservedContainers, List<RMContainer> liveContainers) {
int containerId = 1; int containerId = 1;
int start = containersConfig.indexOf("=") + 1; int start = containersConfig.indexOf("=") + 1;
int end = -1; int end = -1;
Resource used = Resource.newInstance(0, 0);
Resource pending = Resource.newInstance(0, 0);
Priority pri = Priority.newInstance(0);
while (start < containersConfig.length()) { while (start < containersConfig.length()) {
while (start < containersConfig.length() while (start < containersConfig.length()
&& containersConfig.charAt(start) != '(') { && containersConfig.charAt(start) != '(') {
@ -192,43 +199,52 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
// now we found start/end, get container values // now we found start/end, get container values
String[] values = containersConfig.substring(start + 1, end).split(","); String[] values = containersConfig.substring(start + 1, end).split(",");
if (values.length != 6) { if (values.length < 6 || values.length > 8) {
throw new IllegalArgumentException("Format to define container is:" throw new IllegalArgumentException("Format to define container is:"
+ "(priority,resource,host,expression,repeat,reserved)"); + "(priority,resource,host,expression,repeat,reserved, pending)");
} }
Priority pri = Priority.newInstance(Integer.valueOf(values[0])); pri.setPriority(Integer.valueOf(values[0]));
Resource res = parseResourceFromString(values[1]); Resource res = parseResourceFromString(values[1]);
NodeId host = NodeId.newInstance(values[2], 1); NodeId host = NodeId.newInstance(values[2], 1);
String exp = values[3]; String label = values[3];
String userName = "user";
int repeat = Integer.valueOf(values[4]); int repeat = Integer.valueOf(values[4]);
boolean reserved = Boolean.valueOf(values[5]); boolean reserved = Boolean.valueOf(values[5]);
if (values.length >= 7) {
Resources.addTo(pending, parseResourceFromString(values[6]));
}
if (values.length == 8) {
userName = values[7];
}
for (int i = 0; i < repeat; i++) { for (int i = 0; i < repeat; i++) {
Container c = mock(Container.class); Container c = mock(Container.class);
Resources.addTo(used, res);
when(c.getResource()).thenReturn(res); when(c.getResource()).thenReturn(res);
when(c.getPriority()).thenReturn(pri); when(c.getPriority()).thenReturn(pri);
SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c); SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c);
RMContainerImpl rmc = mock(RMContainerImpl.class); RMContainerImpl rmc = mock(RMContainerImpl.class);
when(rmc.getAllocatedSchedulerKey()).thenReturn(sk); when(rmc.getAllocatedSchedulerKey()).thenReturn(sk);
when(rmc.getAllocatedNode()).thenReturn(host); when(rmc.getAllocatedNode()).thenReturn(host);
when(rmc.getNodeLabelExpression()).thenReturn(exp); when(rmc.getNodeLabelExpression()).thenReturn(label);
when(rmc.getAllocatedResource()).thenReturn(res); when(rmc.getAllocatedResource()).thenReturn(res);
when(rmc.getContainer()).thenReturn(c); when(rmc.getContainer()).thenReturn(c);
when(rmc.getApplicationAttemptId()).thenReturn(attemptId); when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
when(rmc.getQueueName()).thenReturn(queueName); when(rmc.getQueueName()).thenReturn(queueName);
final ContainerId cId = ContainerId.newContainerId(attemptId, containerId); final ContainerId cId = ContainerId.newContainerId(attemptId,
when(rmc.getContainerId()).thenReturn( containerId);
cId); when(rmc.getContainerId()).thenReturn(cId);
doAnswer(new Answer<Integer>() { doAnswer(new Answer<Integer>() {
@Override @Override
public Integer answer(InvocationOnMock invocation) throws Throwable { public Integer answer(InvocationOnMock invocation) throws Throwable {
return cId.compareTo(((RMContainer) invocation.getArguments()[0]) return cId.compareTo(
.getContainerId()); ((RMContainer) invocation.getArguments()[0]).getContainerId());
} }
}).when(rmc).compareTo(any(RMContainer.class)); }).when(rmc).compareTo(any(RMContainer.class));
if (containerId == 1) { if (containerId == 1) {
when(rmc.isAMContainer()).thenReturn(true); when(rmc.isAMContainer()).thenReturn(true);
when(app.getAMResource(label)).thenReturn(res);
} }
if (reserved) { if (reserved) {
@ -243,25 +259,44 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
// If this is a non-exclusive allocation // If this is a non-exclusive allocation
String partition = null; String partition = null;
if (exp.isEmpty() if (label.isEmpty()
&& !(partition = nodeIdToSchedulerNodes.get(host).getPartition()) && !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
.isEmpty()) { .isEmpty()) {
LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = queue
queue.getIgnoreExclusivityRMContainers(); .getIgnoreExclusivityRMContainers();
if (!ignoreExclusivityContainers.containsKey(partition)) { if (!ignoreExclusivityContainers.containsKey(partition)) {
ignoreExclusivityContainers.put(partition, ignoreExclusivityContainers.put(partition,
new TreeSet<RMContainer>()); new TreeSet<RMContainer>());
} }
ignoreExclusivityContainers.get(partition).add(rmc); ignoreExclusivityContainers.get(partition).add(rmc);
} }
LOG.debug("add container to app=" + attemptId + " res=" + res LOG.debug("add container to app=" + attemptId + " res=" + res + " node="
+ " node=" + host + " nodeLabelExpression=" + exp + " partition=" + host + " nodeLabelExpression=" + label + " partition="
+ partition); + partition);
containerId++; containerId++;
} }
// Some more app specific aggregated data can be better filled here.
when(app.getPriority()).thenReturn(pri);
when(app.getUser()).thenReturn(userName);
when(app.getCurrentConsumption()).thenReturn(used);
when(app.getCurrentReservation())
.thenReturn(Resources.createResource(0, 0));
Map<String, Resource> pendingForDefaultPartition =
new HashMap<String, Resource>();
// Add for default partition for now.
pendingForDefaultPartition.put(label, pending);
when(app.getTotalPendingRequestsPerPartition())
.thenReturn(pendingForDefaultPartition);
// need to set pending resource in resource usage as well
ResourceUsage ru = new ResourceUsage();
ru.setUsed(label, used);
when(app.getAppAttemptResourceUsage()).thenReturn(ru);
start = end + 1; start = end + 1;
} }
} }
@ -277,6 +312,8 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
*/ */
private void mockApplications(String appsConfig) { private void mockApplications(String appsConfig) {
int id = 1; int id = 1;
HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>();
LeafQueue queue = null;
for (String a : appsConfig.split(";")) { for (String a : appsConfig.split(";")) {
String[] strs = a.split("\t"); String[] strs = a.split("\t");
String queueName = strs[0]; String queueName = strs[0];
@ -285,24 +322,49 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
List<RMContainer> liveContainers = new ArrayList<RMContainer>(); List<RMContainer> liveContainers = new ArrayList<RMContainer>();
List<RMContainer> reservedContainers = new ArrayList<RMContainer>(); List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
ApplicationId appId = ApplicationId.newInstance(0L, id); ApplicationId appId = ApplicationId.newInstance(0L, id);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId
.newInstance(appId, 1);
mockContainers(strs[1], appAttemptId, queueName, reservedContainers,
liveContainers);
FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
when(app.getAMResource(anyString()))
.thenReturn(Resources.createResource(0, 0));
mockContainers(strs[1], app, appAttemptId, queueName, reservedContainers,
liveContainers);
LOG.debug("Application mock: queue: " + queueName + ", appId:" + appId);
when(app.getLiveContainers()).thenReturn(liveContainers); when(app.getLiveContainers()).thenReturn(liveContainers);
when(app.getReservedContainers()).thenReturn(reservedContainers); when(app.getReservedContainers()).thenReturn(reservedContainers);
when(app.getApplicationAttemptId()).thenReturn(appAttemptId); when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
when(app.getApplicationId()).thenReturn(appId); when(app.getApplicationId()).thenReturn(appId);
when(app.getPriority()).thenReturn(Priority.newInstance(0));
// add to LeafQueue // add to LeafQueue
LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); queue = (LeafQueue) nameToCSQueues.get(queueName);
queue.getApplications().add(app); queue.getApplications().add(app);
queue.getAllApplications().add(app);
HashSet<String> users = userMap.get(queueName);
if (null == users) {
users = new HashSet<String>();
userMap.put(queueName, users);
}
users.add(app.getUser());
id++; id++;
} }
for (String queueName : userMap.keySet()) {
queue = (LeafQueue) nameToCSQueues.get(queueName);
// Currently we have user-limit test support only for default label.
Resource totResoucePerPartition = partitionToResource.get("");
Resource capacity = Resources.multiply(totResoucePerPartition,
queue.getQueueCapacities().getAbsoluteCapacity());
HashSet<String> users = userMap.get(queue.getQueueName());
Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size());
for (String user : users) {
when(queue.getUserLimitPerUser(eq(user), any(Resource.class),
anyString())).thenReturn(userLimit);
}
}
} }
private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container, private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container,
@ -436,10 +498,18 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
new Comparator<FiCaSchedulerApp>() { new Comparator<FiCaSchedulerApp>() {
@Override @Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
return a1.getApplicationId().compareTo(a2.getApplicationId()); if (a1.getPriority() != null
&& !a1.getPriority().equals(a2.getPriority())) {
return a1.getPriority().compareTo(a2.getPriority());
}
int res = a1.getApplicationId()
.compareTo(a2.getApplicationId());
return res;
} }
}); });
when(leafQueue.getApplications()).thenReturn(apps); when(leafQueue.getApplications()).thenReturn(apps);
when(leafQueue.getAllApplications()).thenReturn(apps);
OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class); OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
when(so.getPreemptionIterator()).thenAnswer(new Answer() { when(so.getPreemptionIterator()).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) { public Object answer(InvocationOnMock invocation) {
@ -518,10 +588,15 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
float absUsed = Resources.divide(rc, totResoucePerPartition, float absUsed = Resources.divide(rc, totResoucePerPartition,
parseResourceFromString(values[2].trim()), totResoucePerPartition) parseResourceFromString(values[2].trim()), totResoucePerPartition)
+ epsilon; + epsilon;
float used = Resources.divide(rc, totResoucePerPartition,
parseResourceFromString(values[2].trim()),
parseResourceFromString(values[0].trim())) + epsilon;
Resource pending = parseResourceFromString(values[3].trim()); Resource pending = parseResourceFromString(values[3].trim());
qc.setAbsoluteCapacity(partitionName, absGuaranteed); qc.setAbsoluteCapacity(partitionName, absGuaranteed);
qc.setAbsoluteMaximumCapacity(partitionName, absMax); qc.setAbsoluteMaximumCapacity(partitionName, absMax);
qc.setAbsoluteUsedCapacity(partitionName, absUsed); qc.setAbsoluteUsedCapacity(partitionName, absUsed);
qc.setUsedCapacity(partitionName, used);
when(queue.getUsedCapacity()).thenReturn(used);
ru.setPending(partitionName, pending); ru.setPending(partitionName, pending);
if (!isParent(queueExprArray, idx)) { if (!isParent(queueExprArray, idx)) {
LeafQueue lq = (LeafQueue) queue; LeafQueue lq = (LeafQueue) queue;
@ -536,6 +611,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
reserved = parseResourceFromString(values[4].trim()); reserved = parseResourceFromString(values[4].trim());
ru.setReserved(partitionName, reserved); ru.setReserved(partitionName, reserved);
} }
LOG.debug("Setup queue=" + queueName + " partition=" + partitionName LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
+ " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
+ ",abs_used" + absUsed + ",pending_resource=" + pending + ",abs_used" + absUsed + ",pending_resource=" + pending

View File

@ -0,0 +1,868 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Test class for IntraQueuePreemption scenarios.
*/
public class TestProportionalCapacityPreemptionPolicyIntraQueue
extends
ProportionalCapacityPreemptionPolicyMockFramework {
@Before
public void setup() {
super.setup();
conf.setBoolean(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
}
@Test
public void testSimpleIntraQueuePreemption() throws IOException {
/**
* The simplest test preemption, Queue structure is:
*
* <pre>
* root
* / | | \
* a b c d
* </pre>
*
* Guaranteed resource of a/b/c/d are 11:40:20:29 Total cluster resource =
* 100
* Scenario:
* Queue B has few running apps and two high priority apps have demand.
* Apps which are running at low priority (4) will preempt few of its
* resources to meet the demand.
*/
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 80 120 0]);" + // root
"-a(=[11 100 11 50 0]);" + // a
"-b(=[40 100 38 60 0]);" + // b
"-c(=[20 100 10 10 0]);" + // c
"-d(=[29 100 20 0 0])"; // d
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,
// pending)
"a\t" // app1 in a
+ "(1,1,n1,,6,false,25);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,5,false,25);" + // app2 a
"b\t" // app3 in b
+ "(4,1,n1,,34,false,20);" + // app3 b
"b\t" // app4 in b
+ "(4,1,n1,,2,false,10);" + // app4 b
"b\t" // app4 in b
+ "(5,1,n1,,1,false,10);" + // app5 b
"b\t" // app4 in b
+ "(6,1,n1,,1,false,10);" + // app6 in b
"c\t" // app1 in a
+ "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
+ "(1,1,n1,,20,false,0)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// For queue B, app3 and app4 were of lower priority. Hence take 8
// containers from them by hitting the intraQueuePreemptionDemand of 20%.
verify(mDisp, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
verify(mDisp, times(7)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testNoPreemptionForSamePriorityApps() throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / | | \
* a b c d
* </pre>
*
* Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
* 100
* Scenario: In queue A/B, all apps are running at same priority. However
* there are many demands also from these apps. Since all apps are at same
* priority, preemption should not occur here.
*/
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 80 120 0]);" + // root
"-a(=[10 100 10 50 0]);" + // a
"-b(=[40 100 40 60 0]);" + // b
"-c(=[20 100 10 10 0]);" + // c
"-d(=[30 100 20 0 0])"; // d
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,
// pending)
"a\t" // app1 in a
+ "(1,1,n1,,6,false,25);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,5,false,25);" + // app2 a
"b\t" // app3 in b
+ "(1,1,n1,,34,false,20);" + // app3 b
"b\t" // app4 in b
+ "(1,1,n1,,2,false,10);" + // app4 b
"b\t" // app4 in b
+ "(1,1,n1,,1,false,20);" + // app5 b
"b\t" // app4 in b
+ "(1,1,n1,,1,false,10);" + // app6 in b
"c\t" // app1 in a
+ "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
+ "(1,1,n1,,20,false,0)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// For queue B, none of the apps should be preempted.
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(5))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(6))));
}
@Test
public void testNoPreemptionWhenQueueIsUnderCapacityLimit()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 40:60 Total cluster resource = 100 BY
* default, this limit is 50%. Test to verify that there wont be any
* preemption since used capacity is under 50% for queue a/b even though
* there are demands from high priority apps in queue.
*/
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 35 80 0]);" + // root
"-a(=[40 100 10 50 0]);" + // a
"-b(=[60 100 25 30 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,5,false,25);" + // app1 a
"a\t" // app2 in a
+ "(2,1,n1,,5,false,25);" + // app2 a
"b\t" // app3 in b
+ "(4,1,n1,,40,false,20);" + // app3 b
"b\t" // app1 in a
+ "(6,1,n1,,5,false,20)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// For queue A/B, none of the apps should be preempted as used capacity
// is under 50%.
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
}
@Test
public void testLimitPreemptionWithMaxIntraQueuePreemptableLimit()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Guaranteed resource of a/b are 40:60 Total cluster resource = 100
* maxIntraQueuePreemptableLimit by default is 50%. This test is to verify
* that the maximum preemption should occur upto 50%, eventhough demand is
* more.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 55 170 0]);" + // root
"-a(=[40 100 10 50 0]);" + // a
"-b(=[60 100 45 120 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,5,false,25);" + // app1 a
"a\t" // app2 in a
+ "(2,1,n1,,5,false,25);" + // app2 a
"b\t" // app3 in b
+ "(4,1,n1,,40,false,20);" + // app3 b
"b\t" // app1 in a
+ "(6,1,n1,,5,false,100)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// For queueB, eventhough app4 needs 100 resources, only 30 resources were
// preempted. (max is 50% of guaranteed cap of any queue
// "maxIntraQueuePreemptable")
verify(mDisp, times(30)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testLimitPreemptionWithTotalPreemptedResourceAllowed()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 40:60 Total cluster resource = 100
* totalPreemption allowed is 10%. This test is to verify that only
* 10% is preempted.
*/
// report "ideal" preempt as 10%. Ensure preemption happens only for 10%
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
(float) 0.1);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 55 170 0]);" + // root
"-a(=[40 100 10 50 0]);" + // a
"-b(=[60 100 45 120 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,5,false,25);" + // app1 a
"a\t" // app2 in a
+ "(2,1,n1,,5,false,25);" + // app2 a
"b\t" // app3 in b
+ "(4,1,n1,,40,false,20);" + // app3 b
"b\t" // app1 in a
+ "(6,1,n1,,5,false,100)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// For queue B eventhough app4 needs 100 resources, only 10 resources were
// preempted. This is the 10% limit of TOTAL_PREEMPTION_PER_ROUND.
verify(mDisp, times(10)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testAlreadySelectedContainerFromInterQueuePreemption()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 40:60 Total cluster resource = 100
* QueueB is under utilized and QueueA has to release 9 containers here.
* However within queue A, high priority app has also a demand for 20.
* So additional 11 more containers will be preempted making a tota of 20.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 95 170 0]);" + // root
"-a(=[60 100 70 50 0]);" + // a
"-b(=[40 100 25 120 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,50,false,15);" + // app1 a
"a\t" // app2 in a
+ "(2,1,n1,,20,false,20);" + // app2 a
"b\t" // app3 in b
+ "(4,1,n1,,20,false,20);" + // app3 b
"b\t" // app1 in a
+ "(4,1,n1,,5,false,100)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// As per intra queue preemption algorithm, 20 more containers were needed
// for app2 (in queue a). Inter queue pre-emption had already preselected 9
// containers and hence preempted only 11 more.
verify(mDisp, times(20)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@Test
public void testSkipAMContainersInInterQueuePreemption() throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 60:40 Total cluster resource = 100
* While preempting containers during intra-queue preemption, AM containers
* will be spared for now. Verify the same.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 170 0]);" + // root
"-a(=[60 100 60 50 0]);" + // a
"-b(=[40 100 40 120 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,30,false,10);" + "a\t" // app2 in a
+ "(1,1,n1,,10,false,20);" + "a\t" // app3 in a
+ "(2,1,n1,,20,false,20);" + "b\t" // app4 in b
+ "(4,1,n1,,20,false,20);" + "b\t" // app5 in a
+ "(4,1,n1,,20,false,100)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Ensure that only 9 containers are preempted from app2 (sparing 1 AM)
verify(mDisp, times(11)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@Test
public void testSkipAMContainersInInterQueuePreemptionSingleApp()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 50:50 Total cluster resource = 100
* Spare Am container from a lower priority app during its preemption
* cycle. Eventhough there are more demand and no other low priority
* apps are present, still AM contaier need to soared.
*/
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 170 0]);" + // root
"-a(=[50 100 50 50 0]);" + // a
"-b(=[50 100 50 120 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,10,false,10);" + "a\t" // app1 in a
+ "(2,1,n1,,40,false,10);" + "b\t" // app2 in a
+ "(4,1,n1,,20,false,20);" + "b\t" // app3 in b
+ "(4,1,n1,,30,false,100)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Make sure that app1's Am container is spared. Only 9/10 is preempted.
verify(mDisp, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@Test
public void testNoPreemptionForSingleApp() throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 60:40 Total cluster resource = 100
* Only one app is running in queue. And it has more demand but no
* resource are available in queue. Preemption must not occur here.
*/
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 20 50 0]);" + // root
"-a(=[60 100 20 50 0]);" + // a
"-b(=[40 100 0 0 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(4,1,n1,,20,false,50)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Ensure there are 0 preemptions since only one app is running in queue.
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@Test
public void testOverutilizedQueueResourceWithInterQueuePreemption()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
* Scenario:
* Guaranteed resource of a/b are 20:80 Total cluster resource = 100
* QueueB is under utilized and 20 resource will be released from queueA.
* 10 containers will also selected for intra-queue too but it will be
* pre-selected.
*/
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 70 0]);" + // root
"-a(=[20 100 100 30 0]);" + // a
"-b(=[80 100 0 20 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,50,false,0);" + "a\t" // app1 in a
+ "(3,1,n1,,50,false,30);" + "b\t" // app2 in a
+ "(4,1,n1,,0,false,20)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Complete demand request from QueueB for 20 resource must be preempted.
verify(mDisp, times(20)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testNodePartitionIntraQueuePreemption() throws IOException {
/**
* The simplest test of node label, Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Both a/b can access x, and guaranteed capacity of them is 50:50. Two
* nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
* app1/app2/app3 in a, and app4/app5 in b. app1 uses 50 x, app2 uses 50
* NO_LABEL, app3 uses 50 x, app4 uses 50 NO_LABEL. a has 20 pending
* resource for x for app3 of priority 2
*
* After preemption, it should preempt 20 from app1
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;" + // default partition
"x=100,true"; // partition=x
String nodesConfig = "n1=x;" + // n1 has partition=x
"n2="; // n2 is default partition
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100],x=[100 100 100 100]);" + // root
"-a(=[50 100 50 50],x=[50 100 50 50]);" + // a
"-b(=[50 100 50 50],x=[50 100 50 50])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t" // app1 in a
+ "(1,1,n1,x,50,false,10);" + // 50 * x in n1
"a\t" // app2 in a
+ "(2,1,n1,x,0,false,20);" + // 0 * x in n1
"a\t" // app2 in a
+ "(1,1,n2,,50,false);" + // 50 default in n2
"b\t" // app3 in b
+ "(1,1,n1,x,50,false);" + // 50 * x in n1
"b\t" // app4 in b
+ "(1,1,n2,,50,false)"; // 50 default in n2
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// 20 preempted from app1
verify(mDisp, times(20))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
verify(mDisp, never())
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
verify(mDisp, never())
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
}
@Test
public void testComplexIntraQueuePreemption() throws IOException {
/**
* The complex test preemption, Queue structure is:
*
* <pre>
* root
* / | | \
* a b c d
* </pre>
*
* Scenario:
* Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
* 100
* All queues under its capacity, but within each queue there are many
* under served applications.
*/
// report "ideal" preempt as 50%. Ensure preemption happens only for 50%
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
(float) 0.5);
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 75 130 0]);" + // root
"-a(=[10 100 5 50 0]);" + // a
"-b(=[40 100 35 60 0]);" + // b
"-c(=[20 100 10 10 0]);" + // c
"-d(=[30 100 25 10 0])"; // d
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,
// pending)
"a\t" // app1 in a
+ "(1,1,n1,,5,false,25);" + // app1 a
"a\t"
+ "(4,1,n1,,0,false,25);" + // app2 a
"a\t"
+ "(5,1,n1,,0,false,2);" + // app3 a
"b\t"
+ "(3,1,n1,,5,false,20);" + // app4 b
"b\t"
+ "(4,1,n1,,15,false,10);" + // app5 b
"b\t"
+ "(4,1,n1,,10,false,10);" + // app6 b
"b\t"
+ "(5,1,n1,,3,false,5);" + // app7 b
"b\t"
+ "(5,1,n1,,0,false,2);" + // app8 b
"b\t"
+ "(6,1,n1,,2,false,10);" + // app9 in b
"c\t"
+ "(1,1,n1,,8,false,10);" + // app10 in c
"c\t"
+ "(1,1,n1,,2,false,5);" + // app11 in c
"c\t"
+ "(2,1,n1,,0,false,3);" + "d\t" // app12 in c
+ "(2,1,n1,,25,false,0);" + "d\t" // app13 in d
+ "(1,1,n1,,0,false,20)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// High priority app in queueA has 30 resource demand. But low priority
// app has only 5 resource. Hence preempt 4 here sparing AM.
verify(mDisp, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
// Multiple high priority apps has demand of 17. This will be preempted
// from another set of low priority apps.
verify(mDisp, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
verify(mDisp, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(6))));
verify(mDisp, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(5))));
// Only 3 resources will be freed in this round for queue C as we
// are trying to save AM container.
verify(mDisp, times(2)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(10))));
verify(mDisp, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(11))));
}
@Test
public void testIntraQueuePreemptionWithTwoUsers()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 40:60 Total cluster resource = 100
* Consider 2 users in a queue, assume minimum user limit factor is 50%.
* Hence in queueB of 40, each use has a quota of 20. app4 of high priority
* has a demand of 30 and its already using 5. Adhering to userlimit only
* 15 more must be preempted. If its same user,20 would have been preempted
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 55 170 0]);" + // root
"-a(=[60 100 10 50 0]);" + // a
"-b(=[40 100 40 120 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,5,false,25);" + // app1 a
"a\t" // app2 in a
+ "(2,1,n1,,5,false,25);" + // app2 a
"b\t" // app3 in b
+ "(4,1,n1,,35,false,20,user1);" + // app3 b
"b\t" // app4 in b
+ "(6,1,n1,,5,false,30,user2)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Considering user-limit of 50% since only 2 users are there, only preempt
// 15 more (5 is already running) eventhough demand is for 30.
verify(mDisp, times(15)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testComplexNodePartitionIntraQueuePreemption()
throws IOException {
/**
* The simplest test of node label, Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Both a/b can access x, and guaranteed capacity of them is 50:50. Two
* nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
* app1-app4 in a, and app5-app9 in b.
*
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;" + // default partition
"x=100,true"; // partition=x
String nodesConfig = "n1=x;" + // n1 has partition=x
"n2="; // n2 is default partition
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100],x=[100 100 100 100]);" + // root
"-a(=[50 100 50 50],x=[50 100 40 50]);" + // a
"-b(=[50 100 35 50],x=[50 100 50 50])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t" // app1 in a
+ "(1,1,n1,x,35,false,10);" + // 20 * x in n1
"a\t" // app2 in a
+ "(1,1,n1,x,5,false,10);" + // 20 * x in n1
"a\t" // app3 in a
+ "(2,1,n1,x,0,false,20);" + // 0 * x in n1
"a\t" // app4 in a
+ "(1,1,n2,,50,false);" + // 50 default in n2
"b\t" // app5 in b
+ "(1,1,n1,x,50,false);" + // 50 * x in n1
"b\t" // app6 in b
+ "(1,1,n2,,25,false);" + // 25 * default in n2
"b\t" // app7 in b
+ "(1,1,n2,,3,false);" + // 3 * default in n2
"b\t" // app8 in b
+ "(1,1,n2,,2,false);" + // 2 * default in n2
"b\t" // app9 in b
+ "(5,1,n2,,5,false,30)"; // 50 default in n2
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Label X: app3 has demand of 20 for label X. Hence app2 will loose
// 4 (sparing AM) and 16 more from app1 till preemption limit is met.
verify(mDisp, times(16))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
verify(mDisp, times(4))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
// Default Label:For a demand of 30, preempt from all low priority
// apps of default label. 25 will be preempted as preemption limit is
// met.
verify(mDisp, times(1))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(8))));
verify(mDisp, times(2))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(7))));
verify(mDisp, times(22))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6))));
}
}