YARN-4108. CapacityScheduler: Improve preemption to only kill containers that would satisfy the incoming request. (Wangda Tan)

(cherry picked from commit 7e8c9beb41)
This commit is contained in:
Wangda Tan 2016-03-16 16:59:59 -07:00
parent fa7a43529d
commit ae14e5d07f
38 changed files with 1785 additions and 186 deletions

View File

@ -35,7 +35,7 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -49,6 +49,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@ -125,8 +126,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
private long maxWaitTime; private long maxWaitTime;
private CapacityScheduler scheduler; private CapacityScheduler scheduler;
private long monitoringInterval; private long monitoringInterval;
private final Map<RMContainer,Long> preempted = private final Map<RMContainer, Long> preempted = new HashMap<>();
new HashMap<RMContainer,Long>();
private ResourceCalculator rc; private ResourceCalculator rc;
private float percentageClusterPreemptionAllowed; private float percentageClusterPreemptionAllowed;
private double naturalTerminationFactor; private double naturalTerminationFactor;
@ -135,6 +136,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
new HashMap<>(); new HashMap<>();
private RMNodeLabelsManager nlm; private RMNodeLabelsManager nlm;
// Preemptable Entities, synced from scheduler at every run
private Map<String, PreemptableQueue> preemptableEntities = null;
private Set<ContainerId> killableContainers;
public ProportionalCapacityPreemptionPolicy() { public ProportionalCapacityPreemptionPolicy() {
clock = SystemClock.getInstance(); clock = SystemClock.getInstance();
} }
@ -184,6 +189,64 @@ public void editSchedule() {
Resource clusterResources = Resources.clone(scheduler.getClusterResource()); Resource clusterResources = Resources.clone(scheduler.getClusterResource());
containerBasedPreemptOrKill(root, clusterResources); containerBasedPreemptOrKill(root, clusterResources);
} }
@SuppressWarnings("unchecked")
private void cleanupStaledKillableContainers(Resource cluster,
Set<String> leafQueueNames) {
for (String q : leafQueueNames) {
for (TempQueuePerPartition tq : getQueuePartitions(q)) {
// When queue's used - killable <= guaranteed and, killable > 0, we need
// to check if any of killable containers needs to be reverted
if (Resources.lessThanOrEqual(rc, cluster,
Resources.subtract(tq.current, tq.killable), tq.idealAssigned)
&& Resources.greaterThan(rc, cluster, tq.killable, Resources.none())) {
// How many killable resources need to be reverted
// need-to-revert = already-marked-killable - (current - ideal)
Resource toBeRevertedFromKillable = Resources.subtract(tq.killable,
Resources.subtract(tq.current, tq.idealAssigned));
Resource alreadyReverted = Resources.createResource(0);
for (RMContainer c : preemptableEntities.get(q).getKillableContainers(
tq.partition).values()) {
if (Resources.greaterThanOrEqual(rc, cluster, alreadyReverted,
toBeRevertedFromKillable)) {
break;
}
if (Resources.greaterThan(rc, cluster,
Resources.add(alreadyReverted, c.getAllocatedResource()),
toBeRevertedFromKillable)) {
continue;
} else {
// This container need to be marked to unkillable
Resources.addTo(alreadyReverted, c.getAllocatedResource());
rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(c.getApplicationAttemptId(), c,
SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE));
}
}
}
}
}
}
private void syncKillableContainersFromScheduler() {
// sync preemptable entities from scheduler
preemptableEntities =
scheduler.getPreemptionManager().getShallowCopyOfPreemptableEntities();
killableContainers = new HashSet<>();
for (Map.Entry<String, PreemptableQueue> entry : preemptableEntities
.entrySet()) {
PreemptableQueue entity = entry.getValue();
for (Map<ContainerId, RMContainer> map : entity.getKillableContainers()
.values()) {
killableContainers.addAll(map.keySet());
}
}
}
/** /**
* This method selects and tracks containers to be preempted. If a container * This method selects and tracks containers to be preempted. If a container
@ -201,6 +264,8 @@ private void containerBasedPreemptOrKill(CSQueue root,
.getNodeLabelManager().getClusterNodeLabelNames()); .getNodeLabelManager().getClusterNodeLabelNames());
allPartitions.add(RMNodeLabelsManager.NO_LABEL); allPartitions.add(RMNodeLabelsManager.NO_LABEL);
syncKillableContainersFromScheduler();
// extract a summary of the queues from scheduler // extract a summary of the queues from scheduler
synchronized (scheduler) { synchronized (scheduler) {
queueToPartitions.clear(); queueToPartitions.clear();
@ -228,13 +293,17 @@ private void containerBasedPreemptOrKill(CSQueue root,
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
} }
// remove containers from killable list when we want to preempt less resources
// from queue.
cleanupStaledKillableContainers(clusterResources, leafQueueNames);
// based on ideal allocation select containers to be preempted from each // based on ideal allocation select containers to be preempted from each
// queue and each application // queue and each application
Map<ApplicationAttemptId,Set<RMContainer>> toPreempt = Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
getContainersToPreempt(leafQueueNames, clusterResources); getContainersToPreempt(leafQueueNames, clusterResources);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
logToCSV(new ArrayList<String>(leafQueueNames)); logToCSV(new ArrayList<>(leafQueueNames));
} }
// if we are in observeOnly mode return before any action is taken // if we are in observeOnly mode return before any action is taken
@ -254,10 +323,10 @@ private void containerBasedPreemptOrKill(CSQueue root,
// if we tried to preempt this for more than maxWaitTime // if we tried to preempt this for more than maxWaitTime
if (preempted.get(container) != null && if (preempted.get(container) != null &&
preempted.get(container) + maxWaitTime < clock.getTime()) { preempted.get(container) + maxWaitTime < clock.getTime()) {
// kill it // mark container killable
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(appAttemptId, container, new ContainerPreemptEvent(appAttemptId, container,
SchedulerEventType.KILL_PREEMPTED_CONTAINER)); SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
preempted.remove(container); preempted.remove(container);
} else { } else {
if (preempted.get(container) != null) { if (preempted.get(container) != null) {
@ -333,14 +402,14 @@ private void computeIdealResourceDistribution(ResourceCalculator rc,
// qAlloc tracks currently active queues (will decrease progressively as // qAlloc tracks currently active queues (will decrease progressively as
// demand is met) // demand is met)
List<TempQueuePerPartition> qAlloc = new ArrayList<TempQueuePerPartition>(queues); List<TempQueuePerPartition> qAlloc = new ArrayList<>(queues);
// unassigned tracks how much resources are still to assign, initialized // unassigned tracks how much resources are still to assign, initialized
// with the total capacity for this set of queues // with the total capacity for this set of queues
Resource unassigned = Resources.clone(tot_guarant); Resource unassigned = Resources.clone(tot_guarant);
// group queues based on whether they have non-zero guaranteed capacity // group queues based on whether they have non-zero guaranteed capacity
Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<TempQueuePerPartition>(); Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<>();
Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<TempQueuePerPartition>(); Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<>();
for (TempQueuePerPartition q : qAlloc) { for (TempQueuePerPartition q : qAlloc) {
if (Resources if (Resources
@ -415,8 +484,8 @@ private void computeFixpointAllocation(ResourceCalculator rc,
// idealAssigned >= current + pending), remove it from consideration. // idealAssigned >= current + pending), remove it from consideration.
// Sort queues from most under-guaranteed to most over-guaranteed. // Sort queues from most under-guaranteed to most over-guaranteed.
TQComparator tqComparator = new TQComparator(rc, tot_guarant); TQComparator tqComparator = new TQComparator(rc, tot_guarant);
PriorityQueue<TempQueuePerPartition> orderedByNeed = PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
new PriorityQueue<TempQueuePerPartition>(10, tqComparator); tqComparator);
for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) { for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
TempQueuePerPartition q = i.next(); TempQueuePerPartition q = i.next();
if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) { if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
@ -474,7 +543,7 @@ private void computeFixpointAllocation(ResourceCalculator rc,
// percentage of guaranteed. // percentage of guaranteed.
protected Collection<TempQueuePerPartition> getMostUnderservedQueues( protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) { PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) {
ArrayList<TempQueuePerPartition> underserved = new ArrayList<TempQueuePerPartition>(); ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
while (!orderedByNeed.isEmpty()) { while (!orderedByNeed.isEmpty()) {
TempQueuePerPartition q1 = orderedByNeed.remove(); TempQueuePerPartition q1 = orderedByNeed.remove();
underserved.add(q1); underserved.add(q1);
@ -502,7 +571,7 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
if (ignoreGuar) { if (ignoreGuar) {
for (TempQueuePerPartition q : queues) { for (TempQueuePerPartition q : queues) {
q.normalizedGuarantee = (float) 1.0f / ((float) queues.size()); q.normalizedGuarantee = 1.0f / queues.size();
} }
} else { } else {
for (TempQueuePerPartition q : queues) { for (TempQueuePerPartition q : queues) {
@ -515,8 +584,9 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
} }
} }
private String getPartitionByNodeId(NodeId nodeId) { private String getPartitionByRMContainer(RMContainer rmContainer) {
return scheduler.getSchedulerNode(nodeId).getPartition(); return scheduler.getSchedulerNode(rmContainer.getAllocatedNode())
.getPartition();
} }
/** /**
@ -534,7 +604,7 @@ private boolean tryPreemptContainerAndDeductResToObtain(
return false; return false;
} }
String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode()); String nodePartition = getPartitionByRMContainer(rmContainer);
Resource toObtainByPartition = Resource toObtainByPartition =
resourceToObtainByPartitions.get(nodePartition); resourceToObtainByPartitions.get(nodePartition);
@ -575,7 +645,7 @@ private void addToPreemptMap(
ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
Set<RMContainer> set; Set<RMContainer> set;
if (null == (set = preemptMap.get(appAttemptId))) { if (null == (set = preemptMap.get(appAttemptId))) {
set = new HashSet<RMContainer>(); set = new HashSet<>();
preemptMap.put(appAttemptId, set); preemptMap.put(appAttemptId, set);
} }
set.add(containerToPreempt); set.add(containerToPreempt);
@ -587,7 +657,7 @@ private void addToPreemptMap(
* over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to * over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to
* account for containers that will naturally complete. * account for containers that will naturally complete.
* *
* @param queues set of leaf queues to preempt from * @param leafQueueNames set of leaf queues to preempt from
* @param clusterResource total amount of cluster resources * @param clusterResource total amount of cluster resources
* @return a map of applciationID to set of containers to preempt * @return a map of applciationID to set of containers to preempt
*/ */
@ -595,8 +665,8 @@ private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
Set<String> leafQueueNames, Resource clusterResource) { Set<String> leafQueueNames, Resource clusterResource) {
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap = Map<ApplicationAttemptId, Set<RMContainer>> preemptMap =
new HashMap<ApplicationAttemptId, Set<RMContainer>>(); new HashMap<>();
List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>(); List<RMContainer> skippedAMContainerlist = new ArrayList<>();
// Loop all leaf queues // Loop all leaf queues
for (String queueName : leafQueueNames) { for (String queueName : leafQueueNames) {
@ -614,7 +684,7 @@ private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
LeafQueue leafQueue = null; LeafQueue leafQueue = null;
Map<String, Resource> resToObtainByPartition = Map<String, Resource> resToObtainByPartition =
new HashMap<String, Resource>(); new HashMap<>();
for (TempQueuePerPartition qT : getQueuePartitions(queueName)) { for (TempQueuePerPartition qT : getQueuePartitions(queueName)) {
leafQueue = qT.leafQueue; leafQueue = qT.leafQueue;
// we act only if we are violating balance by more than // we act only if we are violating balance by more than
@ -703,7 +773,6 @@ private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
* @param clusterResource * @param clusterResource
* @param preemptMap * @param preemptMap
* @param skippedAMContainerlist * @param skippedAMContainerlist
* @param resToObtain
* @param skippedAMSize * @param skippedAMSize
* @param maxAMCapacityForThisQueue * @param maxAMCapacityForThisQueue
*/ */
@ -751,7 +820,7 @@ private void preemptFrom(FiCaSchedulerApp app,
// first drop reserved containers towards rsrcPreempt // first drop reserved containers towards rsrcPreempt
List<RMContainer> reservedContainers = List<RMContainer> reservedContainers =
new ArrayList<RMContainer>(app.getReservedContainers()); new ArrayList<>(app.getReservedContainers());
for (RMContainer c : reservedContainers) { for (RMContainer c : reservedContainers) {
if (resToObtainByPartition.isEmpty()) { if (resToObtainByPartition.isEmpty()) {
return; return;
@ -771,8 +840,7 @@ private void preemptFrom(FiCaSchedulerApp app,
// if more resources are to be freed go through all live containers in // if more resources are to be freed go through all live containers in
// reverse priority and reverse allocation order and mark them for // reverse priority and reverse allocation order and mark them for
// preemption // preemption
List<RMContainer> liveContainers = List<RMContainer> liveContainers = new ArrayList<>(app.getLiveContainers());
new ArrayList<RMContainer>(app.getLiveContainers());
sortContainers(liveContainers); sortContainers(liveContainers);
@ -788,6 +856,11 @@ private void preemptFrom(FiCaSchedulerApp app,
continue; continue;
} }
// Skip already marked to killable containers
if (killableContainers.contains(c.getContainerId())) {
continue;
}
// Try to preempt this container // Try to preempt this container
tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
clusterResource, preemptMap); clusterResource, preemptMap);
@ -826,6 +899,10 @@ public String getPolicyName() {
return "ProportionalCapacityPreemptionPolicy"; return "ProportionalCapacityPreemptionPolicy";
} }
@VisibleForTesting
public Map<RMContainer, Long> getToPreemptContainers() {
return preempted;
}
/** /**
* This method walks a tree of CSQueue and clones the portion of the state * This method walks a tree of CSQueue and clones the portion of the state
@ -851,6 +928,11 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue,
partitionToLookAt); partitionToLookAt);
Resource guaranteed = Resources.multiply(partitionResource, absCap); Resource guaranteed = Resources.multiply(partitionResource, absCap);
Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap); Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap);
Resource killable = Resources.none();
if (null != preemptableEntities.get(queueName)) {
killable = preemptableEntities.get(queueName)
.getKillableResource(partitionToLookAt);
}
// when partition is a non-exclusive partition, the actual maxCapacity // when partition is a non-exclusive partition, the actual maxCapacity
// could more than specified maxCapacity // could more than specified maxCapacity
@ -875,7 +957,7 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue,
l.getTotalPendingResourcesConsideringUserLimit( l.getTotalPendingResourcesConsideringUserLimit(
partitionResource, partitionToLookAt); partitionResource, partitionToLookAt);
ret = new TempQueuePerPartition(queueName, current, pending, guaranteed, ret = new TempQueuePerPartition(queueName, current, pending, guaranteed,
maxCapacity, preemptionDisabled, partitionToLookAt); maxCapacity, preemptionDisabled, partitionToLookAt, killable);
if (preemptionDisabled) { if (preemptionDisabled) {
ret.untouchableExtra = extra; ret.untouchableExtra = extra;
} else { } else {
@ -886,7 +968,7 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue,
Resource pending = Resource.newInstance(0, 0); Resource pending = Resource.newInstance(0, 0);
ret = ret =
new TempQueuePerPartition(curQueue.getQueueName(), current, pending, new TempQueuePerPartition(curQueue.getQueueName(), current, pending,
guaranteed, maxCapacity, false, partitionToLookAt); guaranteed, maxCapacity, false, partitionToLookAt, killable);
Resource childrensPreemptable = Resource.newInstance(0, 0); Resource childrensPreemptable = Resource.newInstance(0, 0);
for (CSQueue c : curQueue.getChildQueues()) { for (CSQueue c : curQueue.getChildQueues()) {
TempQueuePerPartition subq = TempQueuePerPartition subq =
@ -932,7 +1014,7 @@ private void addTempQueuePartition(TempQueuePerPartition queuePartition) {
Map<String, TempQueuePerPartition> queuePartitions; Map<String, TempQueuePerPartition> queuePartitions;
if (null == (queuePartitions = queueToPartitions.get(queueName))) { if (null == (queuePartitions = queueToPartitions.get(queueName))) {
queuePartitions = new HashMap<String, TempQueuePerPartition>(); queuePartitions = new HashMap<>();
queueToPartitions.put(queueName, queuePartitions); queueToPartitions.put(queueName, queuePartitions);
} }
queuePartitions.put(queuePartition.partition, queuePartition); queuePartitions.put(queuePartition.partition, queuePartition);
@ -971,8 +1053,10 @@ static class TempQueuePerPartition {
final Resource guaranteed; final Resource guaranteed;
final Resource maxCapacity; final Resource maxCapacity;
final String partition; final String partition;
final Resource killable;
Resource idealAssigned; Resource idealAssigned;
Resource toBePreempted; Resource toBePreempted;
// For logging purpose // For logging purpose
Resource actuallyPreempted; Resource actuallyPreempted;
Resource untouchableExtra; Resource untouchableExtra;
@ -986,7 +1070,7 @@ static class TempQueuePerPartition {
TempQueuePerPartition(String queueName, Resource current, Resource pending, TempQueuePerPartition(String queueName, Resource current, Resource pending,
Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled, Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled,
String partition) { String partition, Resource killableResource) {
this.queueName = queueName; this.queueName = queueName;
this.current = current; this.current = current;
this.pending = pending; this.pending = pending;
@ -996,11 +1080,12 @@ static class TempQueuePerPartition {
this.actuallyPreempted = Resource.newInstance(0, 0); this.actuallyPreempted = Resource.newInstance(0, 0);
this.toBePreempted = Resource.newInstance(0, 0); this.toBePreempted = Resource.newInstance(0, 0);
this.normalizedGuarantee = Float.NaN; this.normalizedGuarantee = Float.NaN;
this.children = new ArrayList<TempQueuePerPartition>(); this.children = new ArrayList<>();
this.untouchableExtra = Resource.newInstance(0, 0); this.untouchableExtra = Resource.newInstance(0, 0);
this.preemptableExtra = Resource.newInstance(0, 0); this.preemptableExtra = Resource.newInstance(0, 0);
this.preemptionDisabled = preemptionDisabled; this.preemptionDisabled = preemptionDisabled;
this.partition = partition; this.partition = partition;
this.killable = killableResource;
} }
public void setLeafQueue(LeafQueue l){ public void setLeafQueue(LeafQueue l){
@ -1018,12 +1103,6 @@ public void addChild(TempQueuePerPartition q) {
Resources.addTo(pending, q.pending); Resources.addTo(pending, q.pending);
} }
public void addChildren(ArrayList<TempQueuePerPartition> queues) {
assert leafQueue == null;
children.addAll(queues);
}
public ArrayList<TempQueuePerPartition> getChildren(){ public ArrayList<TempQueuePerPartition> getChildren(){
return children; return children;
} }
@ -1064,18 +1143,13 @@ public String toString() {
return sb.toString(); return sb.toString();
} }
public void printAll() {
LOG.info(this.toString());
for (TempQueuePerPartition sub : this.getChildren()) {
sub.printAll();
}
}
public void assignPreemption(float scalingFactor, public void assignPreemption(float scalingFactor,
ResourceCalculator rc, Resource clusterResource) { ResourceCalculator rc, Resource clusterResource) {
if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) { if (Resources.greaterThan(rc, clusterResource,
toBePreempted = Resources.multiply( Resources.subtract(current, killable), idealAssigned)) {
Resources.subtract(current, idealAssigned), scalingFactor); toBePreempted = Resources.multiply(Resources.subtract(
Resources.subtract(current, killable), idealAssigned),
scalingFactor);
} else { } else {
toBePreempted = Resource.newInstance(0, 0); toBePreempted = Resource.newInstance(0, 0);
} }

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
/** /**
* Represents the ResourceManager's view of an application container. See * Represents the ResourceManager's view of an application container. See

View File

@ -45,6 +45,6 @@ public interface PreemptableResourceScheduler extends ResourceScheduler {
* Ask the scheduler to forcibly interrupt the container given as input * Ask the scheduler to forcibly interrupt the container given as input
* @param container * @param container
*/ */
void killPreemptedContainer(RMContainer container); void markContainerForKillable(RMContainer container);
} }

View File

@ -38,6 +38,8 @@ public class ResourceLimits {
// containers. // containers.
private volatile Resource headroom; private volatile Resource headroom;
private boolean allowPreempt = false;
public ResourceLimits(Resource limit) { public ResourceLimits(Resource limit) {
this(limit, Resources.none()); this(limit, Resources.none());
} }
@ -72,4 +74,11 @@ public void setAmountNeededUnreserve(Resource amountNeededUnreserve) {
this.amountNeededUnreserve = amountNeededUnreserve; this.amountNeededUnreserve = amountNeededUnreserve;
} }
public boolean isAllowPreemption() {
return allowPreempt;
}
public void setIsAllowPreemption(boolean allowPreempt) {
this.allowPreempt = allowPreempt;
}
} }

View File

@ -64,9 +64,8 @@ public abstract class SchedulerNode {
private volatile ResourceUtilization nodeUtilization = private volatile ResourceUtilization nodeUtilization =
ResourceUtilization.newInstance(0, 0, 0f); ResourceUtilization.newInstance(0, 0, 0f);
/* set of containers that are allocated containers */
/** Set of containers that are allocated containers. */ protected final Map<ContainerId, RMContainer> launchedContainers =
private final Map<ContainerId, RMContainer> launchedContainers =
new HashMap<>(); new HashMap<>();
private final RMNode rmNode; private final RMNode rmNode;
@ -168,7 +167,7 @@ public synchronized void allocateContainer(RMContainer rmContainer) {
* @param deltaResource Change in the resource allocation. * @param deltaResource Change in the resource allocation.
* @param increase True if the change is an increase of allocation. * @param increase True if the change is an increase of allocation.
*/ */
private synchronized void changeContainerResource(ContainerId containerId, protected synchronized void changeContainerResource(ContainerId containerId,
Resource deltaResource, boolean increase) { Resource deltaResource, boolean increase) {
if (increase) { if (increase) {
deductUnallocatedResource(deltaResource); deductUnallocatedResource(deltaResource);
@ -242,7 +241,7 @@ public synchronized boolean isValidContainer(ContainerId containerId) {
* Update the resources of the node when allocating a new container. * Update the resources of the node when allocating a new container.
* @param container Container to allocate. * @param container Container to allocate.
*/ */
private synchronized void updateResource(Container container) { protected synchronized void updateResource(Container container) {
addUnallocatedResource(container.getResource()); addUnallocatedResource(container.getResource());
--numContainers; --numContainers;
} }

View File

@ -21,6 +21,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -45,6 +46,7 @@
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -440,11 +442,8 @@ private Resource getCurrentLimitResource(String nodePartition,
Resources.multiplyAndNormalizeDown(resourceCalculator, Resources.multiplyAndNormalizeDown(resourceCalculator,
labelManager.getResourceByLabel(nodePartition, clusterResource), labelManager.getResourceByLabel(nodePartition, clusterResource),
queueCapacities.getAbsoluteMaximumCapacity(nodePartition), minimumAllocation); queueCapacities.getAbsoluteMaximumCapacity(nodePartition), minimumAllocation);
if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { return Resources.min(resourceCalculator, clusterResource,
return Resources.min(resourceCalculator, clusterResource, queueMaxResource, currentResourceLimits.getLimit());
queueMaxResource, currentResourceLimits.getLimit());
}
return queueMaxResource;
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
// When we doing non-exclusive resource allocation, maximum capacity of // When we doing non-exclusive resource allocation, maximum capacity of
// all queues on this label equals to total resource with the label. // all queues on this label equals to total resource with the label.
@ -474,12 +473,19 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
Resource nowTotalUsed = queueUsage.getUsed(nodePartition); Resource nowTotalUsed = queueUsage.getUsed(nodePartition);
// Set headroom for currentResourceLimits // Set headroom for currentResourceLimits:
currentResourceLimits.setHeadroom(Resources.subtract(currentLimitResource, // When queue is a parent queue: Headroom = limit - used + killable
nowTotalUsed)); // When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself)
Resource usedExceptKillable = nowTotalUsed;
if (null != getChildQueues() && !getChildQueues().isEmpty()) {
usedExceptKillable = Resources.subtract(nowTotalUsed,
getTotalKillableResource(nodePartition));
}
currentResourceLimits.setHeadroom(
Resources.subtract(currentLimitResource, usedExceptKillable));
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
nowTotalUsed, currentLimitResource)) { usedExceptKillable, currentLimitResource)) {
// if reservation continous looking enabled, check to see if could we // if reservation continous looking enabled, check to see if could we
// potentially use this node instead of a reserved node if the application // potentially use this node instead of a reserved node if the application
@ -491,7 +497,7 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
resourceCouldBeUnreserved, Resources.none())) { resourceCouldBeUnreserved, Resources.none())) {
// resource-without-reserved = used - reserved // resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource = Resource newTotalWithoutReservedResource =
Resources.subtract(nowTotalUsed, resourceCouldBeUnreserved); Resources.subtract(usedExceptKillable, resourceCouldBeUnreserved);
// when total-used-without-reserved-resource < currentLimit, we still // when total-used-without-reserved-resource < currentLimit, we still
// have chance to allocate on this node by unreserving some containers // have chance to allocate on this node by unreserving some containers
@ -620,11 +626,10 @@ public Set<String> getNodeLabelsForQueue() {
// considering all labels in cluster, only those labels which are // considering all labels in cluster, only those labels which are
// use some resource of this queue can be considered. // use some resource of this queue can be considered.
Set<String> nodeLabels = new HashSet<String>(); Set<String> nodeLabels = new HashSet<String>();
if (this.getAccessibleNodeLabels() != null if (this.getAccessibleNodeLabels() != null && this.getAccessibleNodeLabels()
&& this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { .contains(RMNodeLabelsManager.ANY)) {
nodeLabels.addAll(Sets.union(this.getQueueCapacities() nodeLabels.addAll(Sets.union(this.getQueueCapacities().getNodePartitionsSet(),
.getNodePartitionsSet(), this.getQueueResourceUsage() this.getQueueResourceUsage().getNodePartitionsSet()));
.getNodePartitionsSet()));
} else { } else {
nodeLabels.addAll(this.getAccessibleNodeLabels()); nodeLabels.addAll(this.getAccessibleNodeLabels());
} }
@ -636,4 +641,14 @@ public Set<String> getNodeLabelsForQueue() {
} }
return nodeLabels; return nodeLabels;
} }
public Resource getTotalKillableResource(String partition) {
return csContext.getPreemptionManager().getKillableResource(queueName,
partition);
}
public Iterator<RMContainer> getKillableContainers(String partition) {
return csContext.getPreemptionManager().getKillableContainers(queueName,
partition);
}
} }

View File

@ -26,6 +26,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.List;
@Private @Private
@Unstable @Unstable
public class CSAssignment { public class CSAssignment {
@ -42,6 +44,7 @@ public class CSAssignment {
private boolean fulfilledReservation; private boolean fulfilledReservation;
private final AssignmentInformation assignmentInformation; private final AssignmentInformation assignmentInformation;
private boolean increaseAllocation; private boolean increaseAllocation;
private List<RMContainer> containersToKill;
public CSAssignment(Resource resource, NodeType type) { public CSAssignment(Resource resource, NodeType type) {
this(resource, type, null, null, false, false); this(resource, type, null, null, false, false);
@ -147,4 +150,12 @@ public boolean isIncreasedAllocation() {
public void setIncreasedAllocation(boolean flag) { public void setIncreasedAllocation(boolean flag) {
increaseAllocation = flag; increaseAllocation = flag;
} }
public void setContainersToKill(List<RMContainer> containersToKill) {
this.containersToKill = containersToKill;
}
public List<RMContainer> getContainersToKill() {
return containersToKill;
}
} }

View File

@ -108,6 +108,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -148,6 +150,10 @@ public class CapacityScheduler extends
// timeout to join when we stop this service // timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000; protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
private PreemptionManager preemptionManager = new PreemptionManager();
private volatile boolean isLazyPreemptionEnabled = false;
static final Comparator<CSQueue> nonPartitionedQueueComparator = static final Comparator<CSQueue> nonPartitionedQueueComparator =
new Comparator<CSQueue>() { new Comparator<CSQueue>() {
@Override @Override
@ -298,12 +304,11 @@ private synchronized void initScheduler(Configuration configuration) throws
initMaximumResourceCapability(this.conf.getMaximumAllocation()); initMaximumResourceCapability(this.conf.getMaximumAllocation());
this.calculator = this.conf.getResourceCalculator(); this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications = this.applications = new ConcurrentHashMap<>();
new ConcurrentHashMap<ApplicationId,
SchedulerApplication<FiCaSchedulerApp>>();
this.labelManager = rmContext.getNodeLabelManager(); this.labelManager = rmContext.getNodeLabelManager();
authorizer = YarnAuthorizationProvider.getInstance(yarnConf); authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
initializeQueues(this.conf); initializeQueues(this.conf);
this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
scheduleAsynchronously = this.conf.getScheduleAynschronously(); scheduleAsynchronously = this.conf.getScheduleAynschronously();
asyncScheduleInterval = asyncScheduleInterval =
@ -369,6 +374,9 @@ public void serviceStop() throws Exception {
refreshMaximumAllocation(this.conf.getMaximumAllocation()); refreshMaximumAllocation(this.conf.getMaximumAllocation());
throw new IOException("Failed to re-init queues", t); throw new IOException("Failed to re-init queues", t);
} }
// update lazy preemption
this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled();
} }
long getAsyncScheduleInterval() { long getAsyncScheduleInterval() {
@ -503,6 +511,9 @@ private void initializeQueues(CapacitySchedulerConfiguration conf)
LOG.info("Initialized root queue " + root); LOG.info("Initialized root queue " + root);
updatePlacementRules(); updatePlacementRules();
setQueueAcls(authorizer, queues); setQueueAcls(authorizer, queues);
// Notify Preemption Manager
preemptionManager.refreshQueues(null, root);
} }
@Lock(CapacityScheduler.class) @Lock(CapacityScheduler.class)
@ -531,6 +542,9 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf)
labelManager.reinitializeQueueLabels(getQueueToLabels()); labelManager.reinitializeQueueLabels(getQueueToLabels());
setQueueAcls(authorizer, queues); setQueueAcls(authorizer, queues);
// Notify Preemption Manager
preemptionManager.refreshQueues(null, root);
} }
@VisibleForTesting @VisibleForTesting
@ -1253,8 +1267,10 @@ RMNodeLabelsManager.NO_LABEL, getClusterResource())),
// Try to schedule more if there are no reservations to fulfill // Try to schedule more if there are no reservations to fulfill
if (node.getReservedContainer() == null) { if (node.getReservedContainer() == null) {
if (calculator.computeAvailableContainers(node.getUnallocatedResource(), if (calculator.computeAvailableContainers(Resources
minimumAllocation) > 0) { .add(node.getUnallocatedResource(), node.getTotalKillableResources()),
minimumAllocation) > 0) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Trying to schedule on node: " + node.getNodeName() + LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getUnallocatedResource()); ", available: " + node.getUnallocatedResource());
@ -1263,10 +1279,8 @@ RMNodeLabelsManager.NO_LABEL, getClusterResource())),
assignment = root.assignContainers( assignment = root.assignContainers(
getClusterResource(), getClusterResource(),
node, node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel( new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, getClusterResource())), node.getPartition(), getClusterResource())),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
if (Resources.greaterThan(calculator, getClusterResource(), if (Resources.greaterThan(calculator, getClusterResource(),
assignment.getResource(), Resources.none())) { assignment.getResource(), Resources.none())) {
@ -1436,11 +1450,20 @@ public void handle(SchedulerEvent event) {
markContainerForPreemption(aid, containerToBePreempted); markContainerForPreemption(aid, containerToBePreempted);
} }
break; break;
case KILL_PREEMPTED_CONTAINER: case MARK_CONTAINER_FOR_KILLABLE:
{ {
ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event; ContainerPreemptEvent containerKillableEvent = (ContainerPreemptEvent)event;
RMContainer containerToBeKilled = killContainerEvent.getContainer(); RMContainer killableContainer = containerKillableEvent.getContainer();
killPreemptedContainer(containerToBeKilled); markContainerForKillable(killableContainer);
}
break;
case MARK_CONTAINER_FOR_NONKILLABLE:
{
if (isLazyPreemptionEnabled) {
ContainerPreemptEvent cancelKillContainerEvent =
(ContainerPreemptEvent) event;
markContainerForNonKillable(cancelKillContainerEvent.getContainer());
}
} }
break; break;
default: default:
@ -1548,14 +1571,14 @@ private void rollbackContainerResource(
protected void completedContainerInternal( protected void completedContainerInternal(
RMContainer rmContainer, ContainerStatus containerStatus, RMContainer rmContainer, ContainerStatus containerStatus,
RMContainerEventType event) { RMContainerEventType event) {
Container container = rmContainer.getContainer(); Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
// Get the application for the finished container // Get the application for the finished container
FiCaSchedulerApp application = FiCaSchedulerApp application =
getCurrentAttemptForContainer(container.getId()); getCurrentAttemptForContainer(container.getId());
ApplicationId appId = ApplicationId appId =
container.getId().getApplicationAttemptId().getApplicationId(); containerId.getApplicationAttemptId().getApplicationId();
if (application == null) { if (application == null) {
LOG.info("Container " + container + " of" + " finished application " LOG.info("Container " + container + " of" + " finished application "
+ appId + " completed with event " + event); + appId + " completed with event " + event);
@ -1569,15 +1592,6 @@ protected void completedContainerInternal(
LeafQueue queue = (LeafQueue)application.getQueue(); LeafQueue queue = (LeafQueue)application.getQueue();
queue.completedContainer(getClusterResource(), application, node, queue.completedContainer(getClusterResource(), application, node,
rmContainer, containerStatus, event, null, true); rmContainer, containerStatus, event, null, true);
if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
schedulerHealth.updatePreemption(Time.now(), container.getNodeId(),
container.getId(), queue.getQueuePath());
schedulerHealth.updateSchedulerPreemptionCounts(1);
} else {
schedulerHealth.updateRelease(lastNodeUpdateTime, container.getNodeId(),
container.getId(), queue.getQueuePath());
}
} }
@Override @Override
@ -1613,7 +1627,7 @@ public FiCaSchedulerApp getApplicationAttempt(
ApplicationAttemptId applicationAttemptId) { ApplicationAttemptId applicationAttemptId) {
return super.getApplicationAttempt(applicationAttemptId); return super.getApplicationAttempt(applicationAttemptId);
} }
@Lock(Lock.NoLock.class) @Lock(Lock.NoLock.class)
public FiCaSchedulerNode getNode(NodeId nodeId) { public FiCaSchedulerNode getNode(NodeId nodeId) {
return nodeTracker.getNode(nodeId); return nodeTracker.getNode(nodeId);
@ -1654,15 +1668,60 @@ public void markContainerForPreemption(ApplicationAttemptId aid,
} }
} }
@Override public synchronized void markContainerForKillable(
public void killPreemptedContainer(RMContainer cont) { RMContainer killableContainer) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(SchedulerEventType.KILL_PREEMPTED_CONTAINER + ": container" LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container"
+ cont.toString()); + killableContainer.toString());
}
if (!isLazyPreemptionEnabled) {
super.completedContainer(killableContainer, SchedulerUtils
.createPreemptedContainerStatus(killableContainer.getContainerId(),
SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL);
} else {
FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode(
killableContainer.getAllocatedNode());
FiCaSchedulerApp application = getCurrentAttemptForContainer(
killableContainer.getContainerId());
node.markContainerToKillable(killableContainer.getContainerId());
// notify PreemptionManager
// Get the application for the finished container
if (null != application) {
String leafQueueName = application.getCSLeafQueue().getQueueName();
getPreemptionManager().addKillableContainer(
new KillableContainer(killableContainer, node.getPartition(),
leafQueueName));
} }
}
private synchronized void markContainerForNonKillable(
RMContainer nonKillableContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug(
SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container"
+ nonKillableContainer.toString());
}
FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode(
nonKillableContainer.getAllocatedNode());
FiCaSchedulerApp application = getCurrentAttemptForContainer(
nonKillableContainer.getContainerId());
node.markContainerToNonKillable(nonKillableContainer.getContainerId());
// notify PreemptionManager
// Get the application for the finished container
if (null != application) {
String leafQueueName = application.getCSLeafQueue().getQueueName();
getPreemptionManager().removeKillableContainer(
new KillableContainer(nonKillableContainer, node.getPartition(),
leafQueueName));
} }
super.completedContainer(cont, SchedulerUtils
.createPreemptedContainerStatus(cont.getContainerId(),
SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL);
} }
@Override @Override
@ -1945,6 +2004,7 @@ public Set<String> getPlanQueues() {
return ret; return ret;
} }
@Override
public SchedulerHealth getSchedulerHealth() { public SchedulerHealth getSchedulerHealth() {
return this.schedulerHealth; return this.schedulerHealth;
} }
@ -1953,6 +2013,11 @@ private void setLastNodeUpdateTime(long time) {
this.lastNodeUpdateTime = time; this.lastNodeUpdateTime = time;
} }
@Override
public long getLastNodeUpdateTime() {
return lastNodeUpdateTime;
}
@Override @Override
public Priority checkAndGetApplicationPriority(Priority priorityFromContext, public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
String user, String queueName, ApplicationId applicationId) String user, String queueName, ApplicationId applicationId)
@ -2054,4 +2119,9 @@ public void updateApplicationPriority(Priority newPriority,
+ rmApp.getQueue() + " for application: " + applicationId + rmApp.getQueue() + " for application: " + applicationId
+ " for the user: " + rmApp.getUser()); + " for the user: " + rmApp.getUser());
} }
@Override
public PreemptionManager getPreemptionManager() {
return preemptionManager;
}
} }

View File

@ -257,6 +257,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String RESERVATION_ENFORCEMENT_WINDOW = public static final String RESERVATION_ENFORCEMENT_WINDOW =
"reservation-enforcement-window"; "reservation-enforcement-window";
@Private
public static final String LAZY_PREEMPTION_ENALBED = PREFIX + "lazy-preemption-enabled";
@Private
public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false;
public CapacitySchedulerConfiguration() { public CapacitySchedulerConfiguration() {
this(new Configuration()); this(new Configuration());
} }
@ -1007,7 +1013,11 @@ public void setOrderingPolicy(String queue, String policy) {
@VisibleForTesting @VisibleForTesting
public void setOrderingPolicyParameter(String queue, public void setOrderingPolicyParameter(String queue,
String parameterKey, String parameterValue) { String parameterKey, String parameterValue) {
set(getQueuePrefix(queue) + ORDERING_POLICY + "." set(getQueuePrefix(queue) + ORDERING_POLICY + "." + parameterKey,
+ parameterKey, parameterValue); parameterValue);
}
public boolean getLazyPreemptionEnabled() {
return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED);
} }
} }

View File

@ -18,17 +18,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.Comparator;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import java.util.Comparator;
/** /**
* Read-only interface to {@link CapacityScheduler} context. * Read-only interface to {@link CapacityScheduler} context.
*/ */
@ -61,4 +64,12 @@ public interface CapacitySchedulerContext {
PartitionedQueueComparator getPartitionedQueueComparator(); PartitionedQueueComparator getPartitionedQueueComparator();
FiCaSchedulerNode getNode(NodeId nodeId); FiCaSchedulerNode getNode(NodeId nodeId);
FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId attemptId);
PreemptionManager getPreemptionManager();
SchedulerHealth getSchedulerHealth();
long getLastNodeUpdateTime();
} }

View File

@ -37,9 +37,11 @@
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
@ -63,7 +65,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
@ -823,6 +827,40 @@ private void handleExcessReservedContainer(Resource clusterResource,
assignment.setExcessReservation(null); assignment.setExcessReservation(null);
} }
} }
private void killToPreemptContainers(Resource clusterResource,
FiCaSchedulerNode node,
CSAssignment assignment) {
if (assignment.getContainersToKill() != null) {
StringBuilder sb = new StringBuilder("Killing containers: [");
for (RMContainer c : assignment.getContainersToKill()) {
FiCaSchedulerApp application = csContext.getApplicationAttempt(
c.getApplicationAttemptId());
LeafQueue q = application.getCSLeafQueue();
q.completedContainer(clusterResource, application, node, c, SchedulerUtils
.createPreemptedContainerStatus(c.getContainerId(),
SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL,
null, false);
sb.append("(container=" + c.getContainerId() + " resource=" + c
.getAllocatedResource() + ")");
}
sb.append("] for container=" + assignment.getAssignmentInformation()
.getFirstAllocatedOrReservedContainerId() + " resource=" + assignment
.getResource());
LOG.info(sb.toString());
}
}
private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) {
// Set preemption-allowed:
// For leaf queue, only under-utilized queue is allowed to preempt resources from other queues
float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition);
float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
}
@Override @Override
public synchronized CSAssignment assignContainers(Resource clusterResource, public synchronized CSAssignment assignContainers(Resource clusterResource,
@ -835,6 +873,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
+ " #applications=" + orderingPolicy.getNumSchedulableEntities()); + " #applications=" + orderingPolicy.getNumSchedulableEntities());
} }
setPreemptionAllowed(currentResourceLimits, node.getPartition());
// Check for reserved resources // Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer(); RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) { if (reservedContainer != null) {
@ -846,6 +886,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
currentResourceLimits, schedulingMode, reservedContainer); currentResourceLimits, schedulingMode, reservedContainer);
handleExcessReservedContainer(clusterResource, assignment, node, handleExcessReservedContainer(clusterResource, assignment, node,
application); application);
killToPreemptContainers(clusterResource, node, assignment);
return assignment; return assignment;
} }
} }
@ -907,6 +948,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
handleExcessReservedContainer(clusterResource, assignment, node, handleExcessReservedContainer(clusterResource, assignment, node,
application); application);
killToPreemptContainers(clusterResource, node, assignment);
if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
Resources.none())) { Resources.none())) {
@ -1210,11 +1252,34 @@ public void unreserveIncreasedContainer(Resource clusterResource,
} }
} }
private void updateSchedulerHealthForCompletedContainer(
RMContainer rmContainer, ContainerStatus containerStatus) {
// Update SchedulerHealth for released / preempted container
SchedulerHealth schedulerHealth = csContext.getSchedulerHealth();
if (null == schedulerHealth) {
// Only do update if we have schedulerHealth
return;
}
if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
schedulerHealth.updatePreemption(Time.now(), rmContainer.getAllocatedNode(),
rmContainer.getContainerId(), getQueuePath());
schedulerHealth.updateSchedulerPreemptionCounts(1);
} else {
schedulerHealth.updateRelease(csContext.getLastNodeUpdateTime(),
rmContainer.getAllocatedNode(), rmContainer.getContainerId(),
getQueuePath());
}
}
@Override @Override
public void completedContainer(Resource clusterResource, public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue, ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue,
boolean sortQueues) { boolean sortQueues) {
// Update SchedulerHealth for released / preempted container
updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus);
if (application != null) { if (application != null) {
// unreserve container increase request if it previously reserved. // unreserve container increase request if it previously reserved.
if (rmContainer.hasIncreaseReservation()) { if (rmContainer.hasIncreaseReservation()) {
@ -1265,6 +1330,10 @@ public void completedContainer(Resource clusterResource,
rmContainer, null, event, this, sortQueues); rmContainer, null, event, this, sortQueues);
} }
} }
// Notify PreemptionManager
csContext.getPreemptionManager().removeKillableContainer(
new KillableContainer(rmContainer, node.getPartition(), queueName));
} }
synchronized void allocateResource(Resource clusterResource, synchronized void allocateResource(Resource clusterResource,

View File

@ -18,18 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -49,6 +37,7 @@
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -57,12 +46,25 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@Private @Private
@Evolving @Evolving
public class ParentQueue extends AbstractCSQueue { public class ParentQueue extends AbstractCSQueue {
@ -386,6 +388,11 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
// if our queue cannot access this node, just return // if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(node.getPartition())) { && !accessibleToPartition(node.getPartition())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it is not able to access partition=" + node
.getPartition());
}
return CSAssignment.NULL_ASSIGNMENT; return CSAssignment.NULL_ASSIGNMENT;
} }
@ -431,7 +438,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
resourceCalculator, clusterResource, resourceCalculator, clusterResource,
assignedToChild.getResource(), Resources.none())) { assignedToChild.getResource(), Resources.none())) {
// Track resource utilization for the parent-queue // Track resource utilization for the parent-queue
super.allocateResource(clusterResource, assignedToChild.getResource(), allocateResource(clusterResource, assignedToChild.getResource(),
node.getPartition(), assignedToChild.isIncreasedAllocation()); node.getPartition(), assignedToChild.isIncreasedAllocation());
// Track resource utilization in this pass of the scheduler // Track resource utilization in this pass of the scheduler
@ -494,29 +501,38 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
} }
private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
return (node.getReservedContainer() == null) && // Two conditions need to meet when trying to allocate:
Resources.greaterThanOrEqual(resourceCalculator, clusterResource, // 1) Node doesn't have reserved container
node.getUnallocatedResource(), minimumAllocation); // 2) Node's available-resource + killable-resource should > 0
return node.getReservedContainer() == null && Resources.greaterThanOrEqual(
resourceCalculator, clusterResource, Resources
.add(node.getUnallocatedResource(), node.getTotalKillableResources()),
minimumAllocation);
} }
private ResourceLimits getResourceLimitsOfChild(CSQueue child, private ResourceLimits getResourceLimitsOfChild(CSQueue child,
Resource clusterResource, ResourceLimits parentLimits) { Resource clusterResource, ResourceLimits parentLimits,
String nodePartition) {
// Set resource-limit of a given child, child.limit = // Set resource-limit of a given child, child.limit =
// min(my.limit - my.used + child.used, child.max) // min(my.limit - my.used + child.used, child.max)
// Parent available resource = parent-limit - parent-used-resource // Parent available resource = parent-limit - parent-used-resource
Resource parentMaxAvailableResource = Resource parentMaxAvailableResource = Resources.subtract(
Resources.subtract(parentLimits.getLimit(), getUsedResources()); parentLimits.getLimit(), queueUsage.getUsed(nodePartition));
// Deduct killable from used
Resources.addTo(parentMaxAvailableResource,
getTotalKillableResource(nodePartition));
// Child's limit = parent-available-resource + child-used // Child's limit = parent-available-resource + child-used
Resource childLimit = Resource childLimit = Resources.add(parentMaxAvailableResource,
Resources.add(parentMaxAvailableResource, child.getUsedResources()); child.getQueueResourceUsage().getUsed(nodePartition));
// Get child's max resource // Get child's max resource
Resource childConfiguredMaxResource = Resource childConfiguredMaxResource = Resources.multiplyAndNormalizeDown(
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager resourceCalculator,
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), labelManager.getResourceByLabel(nodePartition, clusterResource),
child.getAbsoluteMaximumCapacity(), minimumAllocation); child.getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition),
minimumAllocation);
// Child's limit should be capped by child configured max resource // Child's limit should be capped by child configured max resource
childLimit = childLimit =
@ -568,7 +584,7 @@ private synchronized CSAssignment assignContainersToChildQueues(
// Get ResourceLimits of child queue before assign containers // Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits = ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, cluster, limits); getResourceLimitsOfChild(childQueue, cluster, limits, node.getPartition());
assignment = childQueue.assignContainers(cluster, node, assignment = childQueue.assignContainers(cluster, node,
childLimits, schedulingMode); childLimits, schedulingMode);
@ -714,8 +730,8 @@ public synchronized void updateClusterResource(Resource clusterResource,
// Update all children // Update all children
for (CSQueue childQueue : childQueues) { for (CSQueue childQueue : childQueues) {
// Get ResourceLimits of child queue before assign containers // Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits = ResourceLimits childLimits = getResourceLimitsOfChild(childQueue,
getResourceLimitsOfChild(childQueue, clusterResource, resourceLimits); clusterResource, resourceLimits, RMNodeLabelsManager.NO_LABEL);
childQueue.updateClusterResource(clusterResource, childLimits); childQueue.updateClusterResource(clusterResource, childLimits);
} }
@ -738,8 +754,8 @@ public void recoverContainer(Resource clusterResource,
synchronized (this) { synchronized (this) {
FiCaSchedulerNode node = FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId()); scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer() allocateResource(clusterResource,
.getResource(), node.getPartition(), false); rmContainer.getContainer().getResource(), node.getPartition(), false);
} }
if (parent != null) { if (parent != null) {
parent.recoverContainer(clusterResource, attempt, rmContainer); parent.recoverContainer(clusterResource, attempt, rmContainer);
@ -766,7 +782,7 @@ public void attachContainer(Resource clusterResource,
if (application != null) { if (application != null) {
FiCaSchedulerNode node = FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId()); scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer() allocateResource(clusterResource, rmContainer.getContainer()
.getResource(), node.getPartition(), false); .getResource(), node.getPartition(), false);
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
@ -802,4 +818,79 @@ public void detachContainer(Resource clusterResource,
public synchronized int getNumApplications() { public synchronized int getNumApplications() {
return numApplications; return numApplications;
} }
synchronized void allocateResource(Resource clusterResource,
Resource resource, String nodePartition, boolean changeContainerResource) {
super.allocateResource(clusterResource, resource, nodePartition,
changeContainerResource);
/**
* check if we need to kill (killable) containers if maximum resource violated.
* Doing this because we will deduct killable resource when going from root.
* For example:
* <pre>
* Root
* / \
* a b
* / \
* a1 a2
* </pre>
*
* a: max=10G, used=10G, killable=2G
* a1: used=8G, killable=2G
* a2: used=2G, pending=2G, killable=0G
*
* When we get queue-a to allocate resource, even if queue-a
* reaches its max resource, we deduct its used by killable, so we can allocate
* at most 2G resources. ResourceLimits passed down to a2 has headroom set to 2G.
*
* If scheduler finds a 2G available resource in existing cluster, and assigns it
* to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G
*
* When this happens, we have to preempt killable container (on same or different
* nodes) of parent queue to avoid violating parent's max resource.
*/
if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition)
< getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) {
killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource);
}
}
private void killContainersToEnforceMaxQueueCapacity(String partition,
Resource clusterResource) {
Iterator<RMContainer> killableContainerIter = getKillableContainers(
partition);
if (!killableContainerIter.hasNext()) {
return;
}
Resource partitionResource = labelManager.getResourceByLabel(partition,
null);
Resource maxResource = Resources.multiply(partitionResource,
getQueueCapacities().getAbsoluteMaximumCapacity(partition));
while (Resources.greaterThan(resourceCalculator, partitionResource,
queueUsage.getUsed(partition), maxResource)) {
RMContainer toKillContainer = killableContainerIter.next();
FiCaSchedulerApp attempt = csContext.getApplicationAttempt(
toKillContainer.getContainerId().getApplicationAttemptId());
FiCaSchedulerNode node = csContext.getNode(
toKillContainer.getAllocatedNode());
if (null != attempt && null != node) {
LeafQueue lq = attempt.getCSLeafQueue();
lq.completedContainer(clusterResource, attempt, node, toKillContainer,
SchedulerUtils.createPreemptedContainerStatus(
toKillContainer.getContainerId(),
SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL,
null, false);
LOG.info("Killed container=" + toKillContainer.getContainerId()
+ " from queue=" + lq.getQueueName() + " to make queue=" + this
.getQueueName() + "'s max-capacity enforced");
}
if (!killableContainerIter.hasNext()) {
break;
}
}
}
} }

View File

@ -108,6 +108,8 @@ protected CSAssignment getCSAssignmentFromAllocateResult(
assignment.setFulfilledReservation(true); assignment.setFulfilledReservation(true);
} }
} }
assignment.setContainersToKill(result.getToKillContainers());
} }
return assignment; return assignment;

View File

@ -19,11 +19,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.List;
public class ContainerAllocation { public class ContainerAllocation {
/** /**
* Skip the locality (e.g. node-local, rack-local, any), and look at other * Skip the locality (e.g. node-local, rack-local, any), and look at other
@ -56,6 +59,7 @@ public class ContainerAllocation {
NodeType containerNodeType = NodeType.NODE_LOCAL; NodeType containerNodeType = NodeType.NODE_LOCAL;
NodeType requestNodeType = NodeType.NODE_LOCAL; NodeType requestNodeType = NodeType.NODE_LOCAL;
Container updatedContainer; Container updatedContainer;
private List<RMContainer> toKillContainers;
public ContainerAllocation(RMContainer containerToBeUnreserved, public ContainerAllocation(RMContainer containerToBeUnreserved,
Resource resourceToBeAllocated, AllocationState state) { Resource resourceToBeAllocated, AllocationState state) {
@ -86,4 +90,12 @@ public NodeType getContainerNodeType() {
public Container getUpdatedContainer() { public Container getUpdatedContainer() {
return updatedContainer; return updatedContainer;
} }
public void setToKillContainers(List<RMContainer> toKillContainers) {
this.toKillContainers = toKillContainers;
}
public List<RMContainer> getToKillContainers() {
return toKillContainers;
}
} }

View File

@ -42,6 +42,9 @@
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.List;
/** /**
* Allocate normal (new) containers, considers locality/label, etc. Using * Allocate normal (new) containers, considers locality/label, etc. Using
* delayed scheduling mechanism to get better locality allocation. * delayed scheduling mechanism to get better locality allocation.
@ -435,9 +438,6 @@ private ContainerAllocation assignContainer(Resource clusterResource,
return ContainerAllocation.LOCALITY_SKIPPED; return ContainerAllocation.LOCALITY_SKIPPED;
} }
assert Resources.greaterThan(
rc, clusterResource, available, Resources.none());
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
priority, capability); priority, capability);
@ -460,6 +460,29 @@ private ContainerAllocation assignContainer(Resource clusterResource,
boolean reservationsContinueLooking = boolean reservationsContinueLooking =
application.getCSLeafQueue().getReservationContinueLooking(); application.getCSLeafQueue().getReservationContinueLooking();
// Check if we need to kill some containers to allocate this one
List<RMContainer> toKillContainers = null;
if (availableContainers == 0 && currentResoureLimits.isAllowPreemption()) {
Resource availableAndKillable = Resources.clone(available);
for (RMContainer killableContainer : node
.getKillableContainers().values()) {
if (null == toKillContainers) {
toKillContainers = new ArrayList<>();
}
toKillContainers.add(killableContainer);
Resources.addTo(availableAndKillable,
killableContainer.getAllocatedResource());
if (Resources.fitsIn(rc,
clusterResource,
capability,
availableAndKillable)) {
// Stop if we find enough spaces
availableContainers = 1;
break;
}
}
}
if (availableContainers > 0) { if (availableContainers > 0) {
// Allocate... // Allocate...
// We will only do continuous reservation when this is not allocated from // We will only do continuous reservation when this is not allocated from
@ -499,12 +522,12 @@ private ContainerAllocation assignContainer(Resource clusterResource,
new ContainerAllocation(unreservedContainer, request.getCapability(), new ContainerAllocation(unreservedContainer, request.getCapability(),
AllocationState.ALLOCATED); AllocationState.ALLOCATED);
result.containerNodeType = type; result.containerNodeType = type;
result.setToKillContainers(toKillContainers);
return result; return result;
} else { } else {
// if we are allowed to allocate but this node doesn't have space, reserve // if we are allowed to allocate but this node doesn't have space, reserve
// it or if this was an already a reserved container, reserve it again // it or if this was an already a reserved container, reserve it again
if (shouldAllocOrReserveNewContainer || rmContainer != null) { if (shouldAllocOrReserveNewContainer || rmContainer != null) {
if (reservationsContinueLooking && rmContainer == null) { if (reservationsContinueLooking && rmContainer == null) {
// we could possibly ignoring queue capacity or user limits when // we could possibly ignoring queue capacity or user limits when
// reservationsContinueLooking is set. Make sure we didn't need to // reservationsContinueLooking is set. Make sure we didn't need to
@ -522,6 +545,7 @@ private ContainerAllocation assignContainer(Resource clusterResource,
new ContainerAllocation(null, request.getCapability(), new ContainerAllocation(null, request.getCapability(),
AllocationState.RESERVED); AllocationState.RESERVED);
result.containerNodeType = type; result.containerNodeType = type;
result.setToKillContainers(null);
return result; return result;
} }
// Skip the locality request // Skip the locality request
@ -613,8 +637,7 @@ private ContainerAllocation handleNewContainerAllocation(
} }
ContainerAllocation doAllocation(ContainerAllocation allocationResult, ContainerAllocation doAllocation(ContainerAllocation allocationResult,
Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerNode node, Priority priority,
SchedulingMode schedulingMode, Priority priority,
RMContainer reservedContainer) { RMContainer reservedContainer) {
// Create the container if necessary // Create the container if necessary
Container container = Container container =
@ -678,9 +701,7 @@ private ContainerAllocation allocate(Resource clusterResource,
if (AllocationState.ALLOCATED == result.state if (AllocationState.ALLOCATED == result.state
|| AllocationState.RESERVED == result.state) { || AllocationState.RESERVED == result.state) {
result = result = doAllocation(result, node, priority, reservedContainer);
doAllocation(result, clusterResource, node, schedulingMode, priority,
reservedContainer);
} }
return result; return result;

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
public class KillableContainer {
RMContainer container;
String partition;
String leafQueueName;
public KillableContainer(RMContainer container, String partition, String leafQueueName) {
this.container = container;
this.partition = partition;
this.leafQueueName = leafQueueName;
}
public RMContainer getRMContainer() {
return this.container;
}
public String getNodePartition() {
return this.partition;
}
public String getLeafQueueName() {
return this.leafQueueName;
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
public class PreemptableQueue {
// Partition -> killable resources and containers
private Map<String, Resource> totalKillableResources = new HashMap<>();
private Map<String, Map<ContainerId, RMContainer>> killableContainers =
new HashMap<>();
private PreemptableQueue parent;
public PreemptableQueue(PreemptableQueue parent) {
this.parent = parent;
}
public PreemptableQueue(Map<String, Resource> totalKillableResources,
Map<String, Map<ContainerId, RMContainer>> killableContainers) {
this.totalKillableResources = totalKillableResources;
this.killableContainers = killableContainers;
}
void addKillableContainer(KillableContainer container) {
String partition = container.getNodePartition();
if (!totalKillableResources.containsKey(partition)) {
totalKillableResources.put(partition, Resources.createResource(0));
killableContainers.put(partition,
new ConcurrentSkipListMap<ContainerId, RMContainer>());
}
RMContainer c = container.getRMContainer();
Resources.addTo(totalKillableResources.get(partition),
c.getAllocatedResource());
killableContainers.get(partition).put(c.getContainerId(), c);
if (null != parent) {
parent.addKillableContainer(container);
}
}
void removeKillableContainer(KillableContainer container) {
String partition = container.getNodePartition();
Map<ContainerId, RMContainer> partitionKillableContainers =
killableContainers.get(partition);
if (partitionKillableContainers != null) {
RMContainer rmContainer = partitionKillableContainers.remove(
container.getRMContainer().getContainerId());
if (null != rmContainer) {
Resources.subtractFrom(totalKillableResources.get(partition),
rmContainer.getAllocatedResource());
}
}
if (null != parent) {
parent.removeKillableContainer(container);
}
}
public Resource getKillableResource(String partition) {
Resource res = totalKillableResources.get(partition);
return res == null ? Resources.none() : res;
}
@SuppressWarnings("unchecked")
public Map<ContainerId, RMContainer> getKillableContainers(String partition) {
Map<ContainerId, RMContainer> map = killableContainers.get(partition);
return map == null ? Collections.EMPTY_MAP : map;
}
public Map<String, Map<ContainerId, RMContainer>> getKillableContainers() {
return killableContainers;
}
Map<String, Resource> getTotalKillableResources() {
return totalKillableResources;
}
}

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PreemptionManager {
private ReentrantReadWriteLock.ReadLock readLock;
private ReentrantReadWriteLock.WriteLock writeLock;
private Map<String, PreemptableQueue> entities = new HashMap<>();
public PreemptionManager() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
public void refreshQueues(CSQueue parent, CSQueue current) {
try {
writeLock.lock();
PreemptableQueue parentEntity = null;
if (parent != null) {
parentEntity = entities.get(parent.getQueueName());
}
if (!entities.containsKey(current.getQueueName())) {
entities.put(current.getQueueName(),
new PreemptableQueue(parentEntity));
}
if (current.getChildQueues() != null) {
for (CSQueue child : current.getChildQueues()) {
refreshQueues(current, child);
}
}
}
finally {
writeLock.unlock();
}
}
public void addKillableContainer(KillableContainer container) {
try {
writeLock.lock();
PreemptableQueue entity = entities.get(container.getLeafQueueName());
if (null != entity) {
entity.addKillableContainer(container);
}
}
finally {
writeLock.unlock();
}
}
public void removeKillableContainer(KillableContainer container) {
try {
writeLock.lock();
PreemptableQueue entity = entities.get(container.getLeafQueueName());
if (null != entity) {
entity.removeKillableContainer(container);
}
}
finally {
writeLock.unlock();
}
}
public void moveKillableContainer(KillableContainer oldContainer,
KillableContainer newContainer) {
// TODO, will be called when partition of the node changed OR
// container moved to different queue
}
public void updateKillableContainerResource(KillableContainer container,
Resource oldResource, Resource newResource) {
// TODO, will be called when container's resource changed
}
@VisibleForTesting
public Map<ContainerId, RMContainer> getKillableContainersMap(
String queueName, String partition) {
try {
readLock.lock();
PreemptableQueue entity = entities.get(queueName);
if (entity != null) {
Map<ContainerId, RMContainer> containers =
entity.getKillableContainers().get(partition);
if (containers != null) {
return containers;
}
}
return Collections.emptyMap();
}
finally {
readLock.unlock();
}
}
public Iterator<RMContainer> getKillableContainers(String queueName,
String partition) {
return getKillableContainersMap(queueName, partition).values().iterator();
}
public Resource getKillableResource(String queueName, String partition) {
try {
readLock.lock();
PreemptableQueue entity = entities.get(queueName);
if (entity != null) {
Resource res = entity.getTotalKillableResources().get(partition);
if (res == null || res.equals(Resources.none())) {
return Resources.none();
}
return Resources.clone(res);
}
return Resources.none();
}
finally {
readLock.unlock();
}
}
public Map<String, PreemptableQueue> getShallowCopyOfPreemptableEntities() {
try {
readLock.lock();
Map<String, PreemptableQueue> map = new HashMap<>();
for (Map.Entry<String, PreemptableQueue> entry : entities.entrySet()) {
String key = entry.getKey();
PreemptableQueue entity = entry.getValue();
map.put(key, new PreemptableQueue(
new HashMap<>(entity.getTotalKillableResources()),
new HashMap<>(entity.getKillableContainers())));
}
return map;
} finally {
readLock.unlock();
}
}
}

View File

@ -120,9 +120,9 @@ public List<AssignmentDetails> getReservationDetails() {
} }
private ContainerId getFirstContainerIdFromOperation(Operation op) { private ContainerId getFirstContainerIdFromOperation(Operation op) {
if (null != operationDetails.get(Operation.ALLOCATION)) { if (null != operationDetails.get(op)) {
List<AssignmentDetails> assignDetails = List<AssignmentDetails> assignDetails =
operationDetails.get(Operation.ALLOCATION); operationDetails.get(op);
if (!assignDetails.isEmpty()) { if (!assignDetails.isEmpty()) {
return assignDetails.get(0).containerId; return assignDetails.get(0).containerId;
} }
@ -131,7 +131,7 @@ private ContainerId getFirstContainerIdFromOperation(Operation op) {
} }
public ContainerId getFirstAllocatedOrReservedContainerId() { public ContainerId getFirstAllocatedOrReservedContainerId() {
ContainerId containerId = null; ContainerId containerId;
containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION); containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION);
if (null != containerId) { if (null != containerId) {
return containerId; return containerId;

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@ -94,6 +95,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
* to hold the message if its app doesn't not get container from a node * to hold the message if its app doesn't not get container from a node
*/ */
private String appSkipNodeDiagnostics; private String appSkipNodeDiagnostics;
private CapacitySchedulerContext capacitySchedulerContext;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, ActiveUsersManager activeUsersManager,
@ -138,28 +140,30 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
} }
containerAllocator = new ContainerAllocator(this, rc, rmContext); containerAllocator = new ContainerAllocator(this, rc, rmContext);
if (scheduler instanceof CapacityScheduler) {
capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
}
} }
synchronized public boolean containerCompleted(RMContainer rmContainer, public synchronized boolean containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event, ContainerStatus containerStatus, RMContainerEventType event,
String partition) { String partition) {
ContainerId containerId = rmContainer.getContainerId();
// Remove from the list of containers // Remove from the list of containers
if (null == liveContainers.remove(rmContainer.getContainerId())) { if (null == liveContainers.remove(containerId)) {
return false; return false;
} }
// Remove from the list of newly allocated containers if found // Remove from the list of newly allocated containers if found
newlyAllocatedContainers.remove(rmContainer); newlyAllocatedContainers.remove(rmContainer);
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
// Inform the container // Inform the container
rmContainer.handle( rmContainer.handle(
new RMContainerFinishedEvent(containerId, containerStatus, event)); new RMContainerFinishedEvent(containerId, containerStatus, event));
containersToPreempt.remove(rmContainer.getContainerId()); containersToPreempt.remove(containerId);
RMAuditLogger.logSuccess(getUser(), RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp", AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
@ -176,7 +180,7 @@ synchronized public boolean containerCompleted(RMContainer rmContainer,
return true; return true;
} }
synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node,
Priority priority, ResourceRequest request, Priority priority, ResourceRequest request,
Container container) { Container container) {
@ -200,7 +204,9 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
// Add it to allContainers list. // Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer); newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);
ContainerId containerId = container.getId();
liveContainers.put(containerId, rmContainer);
// Update consumption and track allocations // Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate( List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
@ -213,17 +219,17 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
// Inform the container // Inform the container
rmContainer.handle( rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START)); new RMContainerEvent(containerId, RMContainerEventType.START));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId=" LOG.debug("allocate: applicationAttemptId="
+ container.getId().getApplicationAttemptId() + containerId.getApplicationAttemptId()
+ " container=" + container.getId() + " host=" + " container=" + containerId + " host="
+ container.getNodeId().getHost() + " type=" + type); + container.getNodeId().getHost() + " type=" + type);
} }
RMAuditLogger.logSuccess(getUser(), RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp", AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), container.getId()); getApplicationId(), containerId);
return rmContainer; return rmContainer;
} }

View File

@ -18,22 +18,29 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class FiCaSchedulerNode extends SchedulerNode { public class FiCaSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
private Map<ContainerId, RMContainer> killableContainers = new HashMap<>();
private Resource totalKillableResources = Resource.newInstance(0, 0);
public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName, public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName,
Set<String> nodeLabels) { Set<String> nodeLabels) {
@ -92,7 +99,6 @@ public synchronized void reserveResource(
@Override @Override
public synchronized void unreserveResource( public synchronized void unreserveResource(
SchedulerApplicationAttempt application) { SchedulerApplicationAttempt application) {
// adding NP checks as this can now be called for preemption // adding NP checks as this can now be called for preemption
if (getReservedContainer() != null if (getReservedContainer() != null
&& getReservedContainer().getContainer() != null && getReservedContainer().getContainer() != null
@ -115,4 +121,55 @@ && getReservedContainer().getContainer().getId()
} }
setReservedContainer(null); setReservedContainer(null);
} }
// According to decisions from preemption policy, mark the container to killable
public synchronized void markContainerToKillable(ContainerId containerId) {
RMContainer c = launchedContainers.get(containerId);
if (c != null && !killableContainers.containsKey(containerId)) {
killableContainers.put(containerId, c);
Resources.addTo(totalKillableResources, c.getAllocatedResource());
}
}
// According to decisions from preemption policy, mark the container to
// non-killable
public synchronized void markContainerToNonKillable(ContainerId containerId) {
RMContainer c = launchedContainers.get(containerId);
if (c != null && killableContainers.containsKey(containerId)) {
killableContainers.remove(containerId);
Resources.subtractFrom(totalKillableResources, c.getAllocatedResource());
}
}
@Override
protected synchronized void updateResource(
Container container) {
super.updateResource(container);
if (killableContainers.containsKey(container.getId())) {
Resources.subtractFrom(totalKillableResources, container.getResource());
killableContainers.remove(container.getId());
}
}
@Override
protected synchronized void changeContainerResource(ContainerId containerId,
Resource deltaResource, boolean increase) {
super.changeContainerResource(containerId, deltaResource, increase);
if (killableContainers.containsKey(containerId)) {
if (increase) {
Resources.addTo(totalKillableResources, deltaResource);
} else {
Resources.subtractFrom(totalKillableResources, deltaResource);
}
}
}
public synchronized Resource getTotalKillableResources() {
return totalKillableResources;
}
public synchronized Map<ContainerId, RMContainer> getKillableContainers() {
return killableContainers;
}
} }

View File

@ -38,10 +38,15 @@ public enum SchedulerEventType {
// Source: ContainerAllocationExpirer // Source: ContainerAllocationExpirer
CONTAINER_EXPIRED, CONTAINER_EXPIRED,
// Source: SchedulingEditPolicy /* Source: SchedulingEditPolicy */
KILL_RESERVED_CONTAINER, KILL_RESERVED_CONTAINER,
MARK_CONTAINER_FOR_PREEMPTION, // Mark a container for preemption
// in the near future // Mark a container for preemption
KILL_PREEMPTED_CONTAINER // Kill a container previously marked for MARK_CONTAINER_FOR_PREEMPTION,
// preemption
// Mark a for-preemption container killable
MARK_CONTAINER_FOR_KILLABLE,
// Cancel a killable container
MARK_CONTAINER_FOR_NONKILLABLE
} }

View File

@ -59,7 +59,7 @@ public void testSchedulerEventDispatcherForPreemptionEvents() {
rmDispatcher.getEventHandler().handle(event1); rmDispatcher.getEventHandler().handle(event1);
ContainerPreemptEvent event2 = ContainerPreemptEvent event2 =
new ContainerPreemptEvent(appAttemptId, container, new ContainerPreemptEvent(appAttemptId, container,
SchedulerEventType.KILL_PREEMPTED_CONTAINER); SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE);
rmDispatcher.getEventHandler().handle(event2); rmDispatcher.getEventHandler().handle(event2);
ContainerPreemptEvent event3 = ContainerPreemptEvent event3 =
new ContainerPreemptEvent(appAttemptId, container, new ContainerPreemptEvent(appAttemptId, container,
@ -70,7 +70,7 @@ public void testSchedulerEventDispatcherForPreemptionEvents() {
verify(sched, times(3)).handle(any(SchedulerEvent.class)); verify(sched, times(3)).handle(any(SchedulerEvent.class));
verify(sched).killReservedContainer(container); verify(sched).killReservedContainer(container);
verify(sched).markContainerForPreemption(appAttemptId, container); verify(sched).markContainerForPreemption(appAttemptId, container);
verify(sched).killPreemptedContainer(container); verify(sched).markContainerForKillable(container);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Assert.fail(); Assert.fail();
} finally { } finally {

View File

@ -2352,7 +2352,7 @@ public void testRMRestartAfterPreemption() throws Exception {
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
.get(app0.getApplicationId()).getCurrentAppAttempt(); .get(app0.getApplicationId()).getCurrentAppAttempt();
// kill app0-attempt // kill app0-attempt
cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer( cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(
app0.getCurrentAppAttempt().getMasterContainer().getId())); app0.getCurrentAppAttempt().getMasterContainer().getId()));
am0.waitForState(RMAppAttemptState.FAILED); am0.waitForState(RMAppAttemptState.FAILED);
} }

View File

@ -63,7 +63,6 @@
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -566,7 +565,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
ContainerId amContainer = ContainerId amContainer =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// Preempt the first attempt; // Preempt the first attempt;
scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED); am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
@ -582,7 +581,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
// Preempt the second attempt. // Preempt the second attempt.
ContainerId amContainer2 = ContainerId amContainer2 =
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer2)); scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2));
am2.waitForState(RMAppAttemptState.FAILED); am2.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
@ -677,7 +676,7 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception {
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// Forcibly preempt the am container; // Forcibly preempt the am container;
scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED); am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());

View File

@ -23,7 +23,7 @@
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_PREEMPTED_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -75,6 +75,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@ -167,6 +168,7 @@ public void setup() {
when(mCS.getConfiguration()).thenReturn(schedConf); when(mCS.getConfiguration()).thenReturn(schedConf);
rmContext = mock(RMContext.class); rmContext = mock(RMContext.class);
when(mCS.getRMContext()).thenReturn(rmContext); when(mCS.getRMContext()).thenReturn(rmContext);
when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager());
when(rmContext.getNodeLabelManager()).thenReturn(lm); when(rmContext.getNodeLabelManager()).thenReturn(lm);
mDisp = mock(EventHandler.class); mDisp = mock(EventHandler.class);
Dispatcher disp = mock(Dispatcher.class); Dispatcher disp = mock(Dispatcher.class);
@ -289,7 +291,7 @@ public void testExpireKill() {
List<ContainerPreemptEvent> events = evtCaptor.getAllValues(); List<ContainerPreemptEvent> events = evtCaptor.getAllValues();
for (ContainerPreemptEvent e : events.subList(20, 20)) { for (ContainerPreemptEvent e : events.subList(20, 20)) {
assertEquals(appC, e.getAppId()); assertEquals(appC, e.getAppId());
assertEquals(KILL_PREEMPTED_CONTAINER, e.getType()); assertEquals(MARK_CONTAINER_FOR_KILLABLE, e.getType());
} }
} }

View File

@ -67,6 +67,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
@ -123,6 +124,7 @@ public void setup() {
mClock = mock(Clock.class); mClock = mock(Clock.class);
cs = mock(CapacityScheduler.class); cs = mock(CapacityScheduler.class);
when(cs.getResourceCalculator()).thenReturn(rc); when(cs.getResourceCalculator()).thenReturn(rc);
when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
nlm = mock(RMNodeLabelsManager.class); nlm = mock(RMNodeLabelsManager.class);
mDisp = mock(EventHandler.class); mDisp = mock(EventHandler.class);

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -264,6 +265,7 @@ public void testLimitsComputation() throws Exception {
thenReturn(CapacityScheduler.nonPartitionedQueueComparator); thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext); when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
// Say cluster has 100 nodes of 16G each // Say cluster has 100 nodes of 16G each
Resource clusterResource = Resource clusterResource =

View File

@ -205,7 +205,7 @@ public void testApplicationPriorityAllocation() throws Exception {
if (++counter > 2) { if (++counter > 2) {
break; break;
} }
cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
} }
// check node report, 12 GB used and 4 GB available // check node report, 12 GB used and 4 GB available
@ -512,7 +512,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority()
if (++counter > 2) { if (++counter > 2) {
break; break;
} }
cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId()));
iterator.remove(); iterator.remove();
} }
@ -542,7 +542,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority()
if (++counter > 1) { if (++counter > 1) {
break; break;
} }
cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId()));
iterator.remove(); iterator.remove();
} }

View File

@ -1188,7 +1188,7 @@ public void testPreemptionInfo() throws Exception {
// kill the 3 containers // kill the 3 containers
for (Container c : allocatedContainers) { for (Container c : allocatedContainers) {
cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
} }
// check values // check values
@ -1197,7 +1197,7 @@ public void testPreemptionInfo() throws Exception {
Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3); Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
// kill app0-attempt0 AM container // kill app0-attempt0 AM container
cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(app0 cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0
.getCurrentAppAttempt().getMasterContainer().getId())); .getCurrentAppAttempt().getMasterContainer().getId()));
// wait for app0 failed // wait for app0 failed
@ -1220,7 +1220,7 @@ public void testPreemptionInfo() throws Exception {
allocatedContainers = allocatedContainers =
am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1); am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
for (Container c : allocatedContainers) { for (Container c : allocatedContainers) {
cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
} }
// check values // check values
@ -1269,7 +1269,7 @@ public void testRecoverRequestAfterPreemption() throws Exception {
} }
// Call killContainer to preempt the container // Call killContainer to preempt the container
cs.killPreemptedContainer(rmContainer); cs.markContainerForKillable(rmContainer);
Assert.assertEquals(3, requests.size()); Assert.assertEquals(3, requests.size());
for (ResourceRequest request : requests) { for (ResourceRequest request : requests) {

View File

@ -0,0 +1,677 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestCapacitySchedulerPreemption {
private static final Log LOG = LogFactory.getLog(
TestCapacitySchedulerPreemption.class);
private final int GB = 1024;
private Configuration conf;
RMNodeLabelsManager mgr;
Clock clock;
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
// Set preemption related configurations
conf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL,
0);
conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
true);
conf.setFloat(
ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f);
conf.setFloat(
ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f);
mgr = new NullRMNodeLabelsManager();
mgr.init(this.conf);
clock = mock(Clock.class);
when(clock.getTime()).thenReturn(0L);
}
private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
RMActiveServices activeServices = rm.getRMActiveService();
SchedulingMonitor mon = null;
for (Service service : activeServices.getServices()) {
if (service instanceof SchedulingMonitor) {
mon = (SchedulingMonitor) service;
break;
}
}
if (mon != null) {
return mon.getSchedulingEditPolicy();
}
return null;
}
@Test (timeout = 60000)
public void testSimplePreemption() throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* 1) Two nodes in the cluster, each of them has 4G.
*
* 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
* more resource available.
*
* 3) app2 submit to queue-c, ask for one 1G container (for AM)
*
* Now the cluster is fulfilled.
*
* 4) app2 asks for another 1G container, system will preempt one container
* from app1, and app2 will receive the preempted container
*/
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
// Do allocation 3 times for node1/node2
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 7 containers now, and no available resource for cluster
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
// Submit app2 to queue-c and asks for a 1G container for AM
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// NM1/NM2 has available resource = 0G
Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
.getUnallocatedResource().getMemory());
// AM asks for a 1 * GB container
am2.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(1), ResourceRequest.ANY,
Resources.createResource(1 * GB), 1)), null);
// Get edit policy and do one update
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
// Call edit schedule twice, and check if one container from app1 marked
// to be "killable"
editPolicy.editSchedule();
editPolicy.editSchedule();
PreemptionManager pm = cs.getPreemptionManager();
Map<ContainerId, RMContainer> killableContainers =
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
Assert.assertEquals(1, killableContainers.size());
Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
.getApplicationAttemptId(), am1.getApplicationAttemptId());
// Call CS.handle once to see if container preempted
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
am2.getApplicationAttemptId());
// App1 has 6 containers, and app2 has 2 containers
Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
rm1.close();
}
@Test (timeout = 60000)
public void testPreemptionConsidersNodeLocalityDelay()
throws Exception {
/**
* Test case: same as testSimplePreemption steps 1-3.
*
* Step 4: app2 asks for 1G container with locality specified, so it needs
* to wait for missed-opportunity before get scheduled.
* Check if system waits missed-opportunity before finish killable container
*/
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
// Do allocation 3 times for node1/node2
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 7 containers now, and no available resource for cluster
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
// Submit app2 to queue-c and asks for a 1G container for AM
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// NM1/NM2 has available resource = 0G
Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
.getUnallocatedResource().getMemory());
// AM asks for a 1 * GB container with unknown host and unknown rack
am2.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(1), ResourceRequest.ANY,
Resources.createResource(1 * GB), 1), ResourceRequest
.newInstance(Priority.newInstance(1), "unknownhost",
Resources.createResource(1 * GB), 1), ResourceRequest
.newInstance(Priority.newInstance(1), "/default-rack",
Resources.createResource(1 * GB), 1)), null);
// Get edit policy and do one update
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
// Call edit schedule twice, and check if one container from app1 marked
// to be "killable"
editPolicy.editSchedule();
editPolicy.editSchedule();
PreemptionManager pm = cs.getPreemptionManager();
Map<ContainerId, RMContainer> killableContainers =
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
.getApplicationAttemptId(), am1.getApplicationAttemptId());
// Call CS.handle once to see if container preempted
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
am2.getApplicationAttemptId());
// App1 has 7 containers, and app2 has 1 containers (no container preempted)
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
// Do allocation again, one container will be preempted
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// App1 has 6 containers, and app2 has 2 containers (new container allocated)
Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
rm1.close();
}
@Test (timeout = 60000)
public void testPreemptionConsidersHardNodeLocality()
throws Exception {
/**
* Test case: same as testSimplePreemption steps 1-3.
*
* Step 4: app2 asks for 1G container with hard locality specified, and
* asked host is not existed
* Confirm system doesn't preempt any container.
*/
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
// Do allocation 3 times for node1/node2
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 7 containers now, and no available resource for cluster
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
// Submit app2 to queue-c and asks for a 1G container for AM
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// NM1/NM2 has available resource = 0G
Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
.getUnallocatedResource().getMemory());
// AM asks for a 1 * GB container for h3 with hard locality,
// h3 doesn't exist in the cluster
am2.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(1), ResourceRequest.ANY,
Resources.createResource(1 * GB), 1, true), ResourceRequest
.newInstance(Priority.newInstance(1), "h3",
Resources.createResource(1 * GB), 1, false), ResourceRequest
.newInstance(Priority.newInstance(1), "/default-rack",
Resources.createResource(1 * GB), 1, false)), null);
// Get edit policy and do one update
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
// Call edit schedule twice, and check if one container from app1 marked
// to be "killable"
editPolicy.editSchedule();
editPolicy.editSchedule();
PreemptionManager pm = cs.getPreemptionManager();
Map<ContainerId, RMContainer> killableContainers =
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
.getApplicationAttemptId(), am1.getApplicationAttemptId());
// Call CS.handle once to see if container preempted
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
am2.getApplicationAttemptId());
// App1 has 7 containers, and app2 has 1 containers (no container preempted)
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
// Do allocation again, nothing will be preempted
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// App1 has 7 containers, and app2 has 1 containers (no container allocated)
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
rm1.close();
}
@Test (timeout = 60000)
public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
throws Exception {
/**
* Test case:
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
* Submit applications to two queues, one uses more than the other, so
* preemption will happen.
*
* Check:
* 1) Killable containers resources will be excluded from PCPP (no duplicated
* container added to killable list)
* 2) When more resources need to be preempted, new containers will be selected
* and killable containers will be considered
*/
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
// Do allocation 6 times for node1
for (int i = 0; i < 6; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// App1 should have 7 containers now, and no available resource for cluster
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
// Submit app2 to queue-c and asks for a 1G container for AM
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
// NM1 has available resource = 0G
Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
// Get edit policy and do one update
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
// Call edit schedule twice, and check if one container from app1 marked
// to be "killable"
editPolicy.editSchedule();
editPolicy.editSchedule();
PreemptionManager pm = cs.getPreemptionManager();
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
// Check killable containers and to-be-preempted containers in edit policy
Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
// Run edit schedule again, confirm status doesn't changed
editPolicy.editSchedule();
Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
// Save current to kill containers
Set<ContainerId> previousKillableContainers = new HashSet<>(
pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
.keySet());
// Update request resource of c from 1 to 2, so we need to preempt
// one more container
am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
// Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
// and 1 container in killable map
editPolicy.editSchedule();
Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
// Call editPolicy.editSchedule() once more, we should have 2 containers killable map
editPolicy.editSchedule();
Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
// Check if previous killable containers included by new killable containers
Map<ContainerId, RMContainer> killableContainers =
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
Assert.assertTrue(
Sets.difference(previousKillableContainers, killableContainers.keySet())
.isEmpty());
}
@Test (timeout = 60000)
public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
throws Exception {
/**
* Test case:
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
* Submit applications to two queues, one uses more than the other, so
* preemption will happen.
*
* Check:
* 1) Containers will be marked to killable
* 2) Cancel resource request
* 3) Killable containers will be cancelled from policy and scheduler
*/
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
// Do allocation 6 times for node1
for (int i = 0; i < 6; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// App1 should have 7 containers now, and no available resource for cluster
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
// Submit app2 to queue-c and asks for a 1G container for AM
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
// NM1 has available resource = 0G
Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
// Get edit policy and do one update
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
// Call edit schedule twice, and check if 3 container from app1 marked
// to be "killable"
editPolicy.editSchedule();
editPolicy.editSchedule();
PreemptionManager pm = cs.getPreemptionManager();
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
// Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
editPolicy.editSchedule();
Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
// Call editSchedule once more to make sure still nothing happens
editPolicy.editSchedule();
Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
}
@Test (timeout = 60000)
public void testPreemptionConsidersUserLimit()
throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
*
* 1) Two nodes in the cluster, each of them has 4G.
*
* 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
* more resource available.
*
* 3) app2 submit to queue-c, ask for one 1G container (for AM)
*
* Now the cluster is fulfilled.
*
* 4) app2 asks for another 1G container, system will preempt one container
* from app1, and app2 will receive the preempted container
*/
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
MockRM rm1 = new MockRM(csConf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
// Do allocation 3 times for node1/node2
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 7 containers now, and no available resource for cluster
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
// Submit app2 to queue-c and asks for a 1G container for AM
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// NM1/NM2 has available resource = 0G
Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
.getUnallocatedResource().getMemory());
// AM asks for a 1 * GB container
am2.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(1), ResourceRequest.ANY,
Resources.createResource(1 * GB), 1)), null);
// Get edit policy and do one update
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
// Call edit schedule twice, and check if no container from app1 marked
// to be "killable"
editPolicy.editSchedule();
editPolicy.editSchedule();
// No preemption happens
PreemptionManager pm = cs.getPreemptionManager();
Map<ContainerId, RMContainer> killableContainers =
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
Assert.assertEquals(0, killableContainers.size());
// Call CS.handle once to see if container preempted
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
am2.getApplicationAttemptId());
// App1 has 7 containers, and app2 has 1 containers (nothing preempted)
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
rm1.close();
}
private Map<ContainerId, RMContainer> waitKillableContainersSize(
PreemptionManager pm, String queueName, String partition,
int expectedSize) throws InterruptedException {
Map<ContainerId, RMContainer> killableContainers =
pm.getKillableContainersMap(queueName, partition);
int wait = 0;
// Wait for at most 5 sec (it should be super fast actually)
while (expectedSize != killableContainers.size() && wait < 500) {
killableContainers = pm.getKillableContainersMap(queueName, partition);
Thread.sleep(10);
wait++;
}
Assert.assertEquals(expectedSize, killableContainers.size());
return killableContainers;
}
}

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -99,6 +100,7 @@ public void setUp() throws Exception {
when(csContext.getResourceCalculator()). when(csContext.getResourceCalculator()).
thenReturn(resourceComparator); thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext); when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
} }
private FiCaSchedulerApp getMockApplication(int appId, String user) { private FiCaSchedulerApp getMockApplication(int appId, String user) {

View File

@ -71,6 +71,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -150,6 +151,7 @@ public void setUp() throws Exception {
thenReturn(CapacityScheduler.nonPartitionedQueueComparator); thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()). when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator); thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getRMContext()).thenReturn(rmContext); when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager = RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf); new RMContainerTokenSecretManager(conf);
@ -3092,6 +3094,7 @@ private CapacitySchedulerContext mockCSContext(
Resources.createResource(GB, 1)); Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).thenReturn( when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(2 * GB, 2)); Resources.createResource(2 * GB, 2));
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
return csContext; return csContext;
} }

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -1676,4 +1677,100 @@ public RMNodeLabelsManager createNodeLabelManager() {
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId())); cs.getApplicationAttempt(am1.getApplicationAttemptId()));
} }
@Test
public void testParentQueueMaxCapsAreRespected() throws Exception {
/*
* Queue tree:
* Root
* / \
* A B
* / \
* A1 A2
*
* A has 50% capacity and 50% max capacity (of label=x)
* A1/A2 has 50% capacity and 100% max capacity (of label=x)
* Cluster has one node (label=x) with resource = 24G.
* So we can at most use 12G resources under queueA.
*/
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
"b"});
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(A, 10);
csConf.setAccessibleNodeLabels(A, toSet("x"));
csConf.setCapacityByLabel(A, "x", 50);
csConf.setMaximumCapacityByLabel(A, "x", 50);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 90);
csConf.setAccessibleNodeLabels(B, toSet("x"));
csConf.setCapacityByLabel(B, "x", 50);
csConf.setMaximumCapacityByLabel(B, "x", 50);
// Define 2nd-level queues
csConf.setQueues(A, new String[] { "a1",
"a2"});
final String A1 = A + ".a1";
csConf.setCapacity(A1, 50);
csConf.setAccessibleNodeLabels(A1, toSet("x"));
csConf.setCapacityByLabel(A1, "x", 50);
csConf.setMaximumCapacityByLabel(A1, "x", 100);
csConf.setUserLimitFactor(A1, 100.0f);
final String A2 = A + ".a2";
csConf.setCapacity(A2, 50);
csConf.setAccessibleNodeLabels(A2, toSet("x"));
csConf.setCapacityByLabel(A2, "x", 50);
csConf.setMaximumCapacityByLabel(A2, "x", 100);
csConf.setUserLimitFactor(A2, 100.0f);
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(
NodeLabel.newInstance("x", false)));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 =
new MockNM("h1:1234", 24 * GB, rm.getResourceTrackerService());
nm1.registerNode();
// Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
am1.allocate("*", 4 * GB, 2, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 10);
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
// Try to launch app2 in a2, asked 2GB, should success
RMApp app2 = rm.submitApp(2 * GB, "app", "user", null, "a2", "x");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
// am2 asks more resources, cannot success because current used = 9G (app1)
// + 2G (app2) = 11G, and queue's max capacity = 12G
am2.allocate("*", 2 * GB, 2, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 10);
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
}
} }

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -92,6 +93,7 @@ public void setUp() throws Exception {
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getNonPartitionedQueueComparator()). when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator); thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getResourceCalculator()). when(csContext.getResourceCalculator()).
thenReturn(resourceComparator); thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext); when(csContext.getRMContext()).thenReturn(rmContext);

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -126,6 +127,7 @@ private void setup(CapacitySchedulerConfiguration csConf,
when(csContext.getNonPartitionedQueueComparator()).thenReturn( when(csContext.getNonPartitionedQueueComparator()).thenReturn(
CapacityScheduler.nonPartitionedQueueComparator); CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getRMContext()).thenReturn(rmContext); when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager( RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(
conf); conf);

View File

@ -356,4 +356,40 @@ public static FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm,
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt(); return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
} }
/**
* Get a queue structure:
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*/
public static Configuration
getConfigurationWithMultipleQueues(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b", "c" });
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 100);
conf.setUserLimitFactor(A, 100);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 20);
conf.setMaximumCapacity(B, 100);
conf.setUserLimitFactor(B, 100);
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
conf.setCapacity(C, 70);
conf.setMaximumCapacity(C, 100);
conf.setUserLimitFactor(C, 100);
return conf;
}
} }

View File

@ -1451,7 +1451,7 @@ public void testRecoverRequestAfterPreemption() throws Exception {
// Trigger container rescheduled event // Trigger container rescheduled event
scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer, scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer,
SchedulerEventType.KILL_PREEMPTED_CONTAINER)); SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
List<ResourceRequest> requests = rmContainer.getResourceRequests(); List<ResourceRequest> requests = rmContainer.getResourceRequests();
// Once recovered, resource request will be present again in app // Once recovered, resource request will be present again in app