CapacityScheduler: Improve preemption to only kill containers that would satisfy the incoming request. (Wangda Tan)

This commit is contained in:
Wangda Tan 2016-03-16 16:59:59 -07:00
parent 32d043d9c5
commit 7e8c9beb41
38 changed files with 1785 additions and 186 deletions

View File

@ -35,7 +35,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -49,6 +49,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@ -125,8 +126,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
private long maxWaitTime;
private CapacityScheduler scheduler;
private long monitoringInterval;
private final Map<RMContainer,Long> preempted =
new HashMap<RMContainer,Long>();
private final Map<RMContainer, Long> preempted = new HashMap<>();
private ResourceCalculator rc;
private float percentageClusterPreemptionAllowed;
private double naturalTerminationFactor;
@ -135,6 +136,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
new HashMap<>();
private RMNodeLabelsManager nlm;
// Preemptable Entities, synced from scheduler at every run
private Map<String, PreemptableQueue> preemptableEntities = null;
private Set<ContainerId> killableContainers;
public ProportionalCapacityPreemptionPolicy() {
clock = SystemClock.getInstance();
}
@ -184,6 +189,64 @@ public void editSchedule() {
Resource clusterResources = Resources.clone(scheduler.getClusterResource());
containerBasedPreemptOrKill(root, clusterResources);
}
@SuppressWarnings("unchecked")
private void cleanupStaledKillableContainers(Resource cluster,
Set<String> leafQueueNames) {
for (String q : leafQueueNames) {
for (TempQueuePerPartition tq : getQueuePartitions(q)) {
// When queue's used - killable <= guaranteed and, killable > 0, we need
// to check if any of killable containers needs to be reverted
if (Resources.lessThanOrEqual(rc, cluster,
Resources.subtract(tq.current, tq.killable), tq.idealAssigned)
&& Resources.greaterThan(rc, cluster, tq.killable, Resources.none())) {
// How many killable resources need to be reverted
// need-to-revert = already-marked-killable - (current - ideal)
Resource toBeRevertedFromKillable = Resources.subtract(tq.killable,
Resources.subtract(tq.current, tq.idealAssigned));
Resource alreadyReverted = Resources.createResource(0);
for (RMContainer c : preemptableEntities.get(q).getKillableContainers(
tq.partition).values()) {
if (Resources.greaterThanOrEqual(rc, cluster, alreadyReverted,
toBeRevertedFromKillable)) {
break;
}
if (Resources.greaterThan(rc, cluster,
Resources.add(alreadyReverted, c.getAllocatedResource()),
toBeRevertedFromKillable)) {
continue;
} else {
// This container need to be marked to unkillable
Resources.addTo(alreadyReverted, c.getAllocatedResource());
rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(c.getApplicationAttemptId(), c,
SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE));
}
}
}
}
}
}
private void syncKillableContainersFromScheduler() {
// sync preemptable entities from scheduler
preemptableEntities =
scheduler.getPreemptionManager().getShallowCopyOfPreemptableEntities();
killableContainers = new HashSet<>();
for (Map.Entry<String, PreemptableQueue> entry : preemptableEntities
.entrySet()) {
PreemptableQueue entity = entry.getValue();
for (Map<ContainerId, RMContainer> map : entity.getKillableContainers()
.values()) {
killableContainers.addAll(map.keySet());
}
}
}
/**
* This method selects and tracks containers to be preempted. If a container
@ -201,6 +264,8 @@ private void containerBasedPreemptOrKill(CSQueue root,
.getNodeLabelManager().getClusterNodeLabelNames());
allPartitions.add(RMNodeLabelsManager.NO_LABEL);
syncKillableContainersFromScheduler();
// extract a summary of the queues from scheduler
synchronized (scheduler) {
queueToPartitions.clear();
@ -228,13 +293,17 @@ private void containerBasedPreemptOrKill(CSQueue root,
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
}
// remove containers from killable list when we want to preempt less resources
// from queue.
cleanupStaledKillableContainers(clusterResources, leafQueueNames);
// based on ideal allocation select containers to be preempted from each
// queue and each application
Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
getContainersToPreempt(leafQueueNames, clusterResources);
if (LOG.isDebugEnabled()) {
logToCSV(new ArrayList<String>(leafQueueNames));
logToCSV(new ArrayList<>(leafQueueNames));
}
// if we are in observeOnly mode return before any action is taken
@ -254,10 +323,10 @@ private void containerBasedPreemptOrKill(CSQueue root,
// if we tried to preempt this for more than maxWaitTime
if (preempted.get(container) != null &&
preempted.get(container) + maxWaitTime < clock.getTime()) {
// kill it
// mark container killable
rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(appAttemptId, container,
SchedulerEventType.KILL_PREEMPTED_CONTAINER));
SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
preempted.remove(container);
} else {
if (preempted.get(container) != null) {
@ -333,14 +402,14 @@ private void computeIdealResourceDistribution(ResourceCalculator rc,
// qAlloc tracks currently active queues (will decrease progressively as
// demand is met)
List<TempQueuePerPartition> qAlloc = new ArrayList<TempQueuePerPartition>(queues);
List<TempQueuePerPartition> qAlloc = new ArrayList<>(queues);
// unassigned tracks how much resources are still to assign, initialized
// with the total capacity for this set of queues
Resource unassigned = Resources.clone(tot_guarant);
// group queues based on whether they have non-zero guaranteed capacity
Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<TempQueuePerPartition>();
Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<TempQueuePerPartition>();
Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<>();
Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<>();
for (TempQueuePerPartition q : qAlloc) {
if (Resources
@ -415,8 +484,8 @@ private void computeFixpointAllocation(ResourceCalculator rc,
// idealAssigned >= current + pending), remove it from consideration.
// Sort queues from most under-guaranteed to most over-guaranteed.
TQComparator tqComparator = new TQComparator(rc, tot_guarant);
PriorityQueue<TempQueuePerPartition> orderedByNeed =
new PriorityQueue<TempQueuePerPartition>(10, tqComparator);
PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
tqComparator);
for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
TempQueuePerPartition q = i.next();
if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
@ -474,7 +543,7 @@ private void computeFixpointAllocation(ResourceCalculator rc,
// percentage of guaranteed.
protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) {
ArrayList<TempQueuePerPartition> underserved = new ArrayList<TempQueuePerPartition>();
ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
while (!orderedByNeed.isEmpty()) {
TempQueuePerPartition q1 = orderedByNeed.remove();
underserved.add(q1);
@ -502,7 +571,7 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
if (ignoreGuar) {
for (TempQueuePerPartition q : queues) {
q.normalizedGuarantee = (float) 1.0f / ((float) queues.size());
q.normalizedGuarantee = 1.0f / queues.size();
}
} else {
for (TempQueuePerPartition q : queues) {
@ -515,8 +584,9 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
}
}
private String getPartitionByNodeId(NodeId nodeId) {
return scheduler.getSchedulerNode(nodeId).getPartition();
private String getPartitionByRMContainer(RMContainer rmContainer) {
return scheduler.getSchedulerNode(rmContainer.getAllocatedNode())
.getPartition();
}
/**
@ -534,7 +604,7 @@ private boolean tryPreemptContainerAndDeductResToObtain(
return false;
}
String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode());
String nodePartition = getPartitionByRMContainer(rmContainer);
Resource toObtainByPartition =
resourceToObtainByPartitions.get(nodePartition);
@ -575,7 +645,7 @@ private void addToPreemptMap(
ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
Set<RMContainer> set;
if (null == (set = preemptMap.get(appAttemptId))) {
set = new HashSet<RMContainer>();
set = new HashSet<>();
preemptMap.put(appAttemptId, set);
}
set.add(containerToPreempt);
@ -587,7 +657,7 @@ private void addToPreemptMap(
* over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to
* account for containers that will naturally complete.
*
* @param queues set of leaf queues to preempt from
* @param leafQueueNames set of leaf queues to preempt from
* @param clusterResource total amount of cluster resources
* @return a map of applciationID to set of containers to preempt
*/
@ -595,8 +665,8 @@ private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
Set<String> leafQueueNames, Resource clusterResource) {
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap =
new HashMap<ApplicationAttemptId, Set<RMContainer>>();
List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
new HashMap<>();
List<RMContainer> skippedAMContainerlist = new ArrayList<>();
// Loop all leaf queues
for (String queueName : leafQueueNames) {
@ -614,7 +684,7 @@ private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
LeafQueue leafQueue = null;
Map<String, Resource> resToObtainByPartition =
new HashMap<String, Resource>();
new HashMap<>();
for (TempQueuePerPartition qT : getQueuePartitions(queueName)) {
leafQueue = qT.leafQueue;
// we act only if we are violating balance by more than
@ -703,7 +773,6 @@ private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
* @param clusterResource
* @param preemptMap
* @param skippedAMContainerlist
* @param resToObtain
* @param skippedAMSize
* @param maxAMCapacityForThisQueue
*/
@ -751,7 +820,7 @@ private void preemptFrom(FiCaSchedulerApp app,
// first drop reserved containers towards rsrcPreempt
List<RMContainer> reservedContainers =
new ArrayList<RMContainer>(app.getReservedContainers());
new ArrayList<>(app.getReservedContainers());
for (RMContainer c : reservedContainers) {
if (resToObtainByPartition.isEmpty()) {
return;
@ -771,8 +840,7 @@ private void preemptFrom(FiCaSchedulerApp app,
// if more resources are to be freed go through all live containers in
// reverse priority and reverse allocation order and mark them for
// preemption
List<RMContainer> liveContainers =
new ArrayList<RMContainer>(app.getLiveContainers());
List<RMContainer> liveContainers = new ArrayList<>(app.getLiveContainers());
sortContainers(liveContainers);
@ -788,6 +856,11 @@ private void preemptFrom(FiCaSchedulerApp app,
continue;
}
// Skip already marked to killable containers
if (killableContainers.contains(c.getContainerId())) {
continue;
}
// Try to preempt this container
tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
clusterResource, preemptMap);
@ -826,6 +899,10 @@ public String getPolicyName() {
return "ProportionalCapacityPreemptionPolicy";
}
@VisibleForTesting
public Map<RMContainer, Long> getToPreemptContainers() {
return preempted;
}
/**
* This method walks a tree of CSQueue and clones the portion of the state
@ -851,6 +928,11 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue,
partitionToLookAt);
Resource guaranteed = Resources.multiply(partitionResource, absCap);
Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap);
Resource killable = Resources.none();
if (null != preemptableEntities.get(queueName)) {
killable = preemptableEntities.get(queueName)
.getKillableResource(partitionToLookAt);
}
// when partition is a non-exclusive partition, the actual maxCapacity
// could more than specified maxCapacity
@ -875,7 +957,7 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue,
l.getTotalPendingResourcesConsideringUserLimit(
partitionResource, partitionToLookAt);
ret = new TempQueuePerPartition(queueName, current, pending, guaranteed,
maxCapacity, preemptionDisabled, partitionToLookAt);
maxCapacity, preemptionDisabled, partitionToLookAt, killable);
if (preemptionDisabled) {
ret.untouchableExtra = extra;
} else {
@ -886,7 +968,7 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue,
Resource pending = Resource.newInstance(0, 0);
ret =
new TempQueuePerPartition(curQueue.getQueueName(), current, pending,
guaranteed, maxCapacity, false, partitionToLookAt);
guaranteed, maxCapacity, false, partitionToLookAt, killable);
Resource childrensPreemptable = Resource.newInstance(0, 0);
for (CSQueue c : curQueue.getChildQueues()) {
TempQueuePerPartition subq =
@ -932,7 +1014,7 @@ private void addTempQueuePartition(TempQueuePerPartition queuePartition) {
Map<String, TempQueuePerPartition> queuePartitions;
if (null == (queuePartitions = queueToPartitions.get(queueName))) {
queuePartitions = new HashMap<String, TempQueuePerPartition>();
queuePartitions = new HashMap<>();
queueToPartitions.put(queueName, queuePartitions);
}
queuePartitions.put(queuePartition.partition, queuePartition);
@ -971,8 +1053,10 @@ static class TempQueuePerPartition {
final Resource guaranteed;
final Resource maxCapacity;
final String partition;
final Resource killable;
Resource idealAssigned;
Resource toBePreempted;
// For logging purpose
Resource actuallyPreempted;
Resource untouchableExtra;
@ -986,7 +1070,7 @@ static class TempQueuePerPartition {
TempQueuePerPartition(String queueName, Resource current, Resource pending,
Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled,
String partition) {
String partition, Resource killableResource) {
this.queueName = queueName;
this.current = current;
this.pending = pending;
@ -996,11 +1080,12 @@ static class TempQueuePerPartition {
this.actuallyPreempted = Resource.newInstance(0, 0);
this.toBePreempted = Resource.newInstance(0, 0);
this.normalizedGuarantee = Float.NaN;
this.children = new ArrayList<TempQueuePerPartition>();
this.children = new ArrayList<>();
this.untouchableExtra = Resource.newInstance(0, 0);
this.preemptableExtra = Resource.newInstance(0, 0);
this.preemptionDisabled = preemptionDisabled;
this.partition = partition;
this.killable = killableResource;
}
public void setLeafQueue(LeafQueue l){
@ -1018,12 +1103,6 @@ public void addChild(TempQueuePerPartition q) {
Resources.addTo(pending, q.pending);
}
public void addChildren(ArrayList<TempQueuePerPartition> queues) {
assert leafQueue == null;
children.addAll(queues);
}
public ArrayList<TempQueuePerPartition> getChildren(){
return children;
}
@ -1064,18 +1143,13 @@ public String toString() {
return sb.toString();
}
public void printAll() {
LOG.info(this.toString());
for (TempQueuePerPartition sub : this.getChildren()) {
sub.printAll();
}
}
public void assignPreemption(float scalingFactor,
ResourceCalculator rc, Resource clusterResource) {
if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) {
toBePreempted = Resources.multiply(
Resources.subtract(current, idealAssigned), scalingFactor);
if (Resources.greaterThan(rc, clusterResource,
Resources.subtract(current, killable), idealAssigned)) {
toBePreempted = Resources.multiply(Resources.subtract(
Resources.subtract(current, killable), idealAssigned),
scalingFactor);
} else {
toBePreempted = Resource.newInstance(0, 0);
}

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
/**
* Represents the ResourceManager's view of an application container. See

View File

@ -45,6 +45,6 @@ public interface PreemptableResourceScheduler extends ResourceScheduler {
* Ask the scheduler to forcibly interrupt the container given as input
* @param container
*/
void killPreemptedContainer(RMContainer container);
void markContainerForKillable(RMContainer container);
}

View File

@ -38,6 +38,8 @@ public class ResourceLimits {
// containers.
private volatile Resource headroom;
private boolean allowPreempt = false;
public ResourceLimits(Resource limit) {
this(limit, Resources.none());
}
@ -72,4 +74,11 @@ public void setAmountNeededUnreserve(Resource amountNeededUnreserve) {
this.amountNeededUnreserve = amountNeededUnreserve;
}
public boolean isAllowPreemption() {
return allowPreempt;
}
public void setIsAllowPreemption(boolean allowPreempt) {
this.allowPreempt = allowPreempt;
}
}

View File

@ -64,9 +64,8 @@ public abstract class SchedulerNode {
private volatile ResourceUtilization nodeUtilization =
ResourceUtilization.newInstance(0, 0, 0f);
/** Set of containers that are allocated containers. */
private final Map<ContainerId, RMContainer> launchedContainers =
/* set of containers that are allocated containers */
protected final Map<ContainerId, RMContainer> launchedContainers =
new HashMap<>();
private final RMNode rmNode;
@ -168,7 +167,7 @@ public synchronized void allocateContainer(RMContainer rmContainer) {
* @param deltaResource Change in the resource allocation.
* @param increase True if the change is an increase of allocation.
*/
private synchronized void changeContainerResource(ContainerId containerId,
protected synchronized void changeContainerResource(ContainerId containerId,
Resource deltaResource, boolean increase) {
if (increase) {
deductUnallocatedResource(deltaResource);
@ -242,7 +241,7 @@ public synchronized boolean isValidContainer(ContainerId containerId) {
* Update the resources of the node when allocating a new container.
* @param container Container to allocate.
*/
private synchronized void updateResource(Container container) {
protected synchronized void updateResource(Container container) {
addUnallocatedResource(container.getResource());
--numContainers;
}

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@ -45,6 +46,7 @@
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -440,11 +442,8 @@ private Resource getCurrentLimitResource(String nodePartition,
Resources.multiplyAndNormalizeDown(resourceCalculator,
labelManager.getResourceByLabel(nodePartition, clusterResource),
queueCapacities.getAbsoluteMaximumCapacity(nodePartition), minimumAllocation);
if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
return Resources.min(resourceCalculator, clusterResource,
queueMaxResource, currentResourceLimits.getLimit());
}
return queueMaxResource;
return Resources.min(resourceCalculator, clusterResource,
queueMaxResource, currentResourceLimits.getLimit());
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
// When we doing non-exclusive resource allocation, maximum capacity of
// all queues on this label equals to total resource with the label.
@ -474,12 +473,19 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
Resource nowTotalUsed = queueUsage.getUsed(nodePartition);
// Set headroom for currentResourceLimits
currentResourceLimits.setHeadroom(Resources.subtract(currentLimitResource,
nowTotalUsed));
// Set headroom for currentResourceLimits:
// When queue is a parent queue: Headroom = limit - used + killable
// When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself)
Resource usedExceptKillable = nowTotalUsed;
if (null != getChildQueues() && !getChildQueues().isEmpty()) {
usedExceptKillable = Resources.subtract(nowTotalUsed,
getTotalKillableResource(nodePartition));
}
currentResourceLimits.setHeadroom(
Resources.subtract(currentLimitResource, usedExceptKillable));
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
nowTotalUsed, currentLimitResource)) {
usedExceptKillable, currentLimitResource)) {
// if reservation continous looking enabled, check to see if could we
// potentially use this node instead of a reserved node if the application
@ -491,7 +497,7 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
resourceCouldBeUnreserved, Resources.none())) {
// resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource =
Resources.subtract(nowTotalUsed, resourceCouldBeUnreserved);
Resources.subtract(usedExceptKillable, resourceCouldBeUnreserved);
// when total-used-without-reserved-resource < currentLimit, we still
// have chance to allocate on this node by unreserving some containers
@ -620,11 +626,10 @@ public Set<String> getNodeLabelsForQueue() {
// considering all labels in cluster, only those labels which are
// use some resource of this queue can be considered.
Set<String> nodeLabels = new HashSet<String>();
if (this.getAccessibleNodeLabels() != null
&& this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
nodeLabels.addAll(Sets.union(this.getQueueCapacities()
.getNodePartitionsSet(), this.getQueueResourceUsage()
.getNodePartitionsSet()));
if (this.getAccessibleNodeLabels() != null && this.getAccessibleNodeLabels()
.contains(RMNodeLabelsManager.ANY)) {
nodeLabels.addAll(Sets.union(this.getQueueCapacities().getNodePartitionsSet(),
this.getQueueResourceUsage().getNodePartitionsSet()));
} else {
nodeLabels.addAll(this.getAccessibleNodeLabels());
}
@ -636,4 +641,14 @@ public Set<String> getNodeLabelsForQueue() {
}
return nodeLabels;
}
public Resource getTotalKillableResource(String partition) {
return csContext.getPreemptionManager().getKillableResource(queueName,
partition);
}
public Iterator<RMContainer> getKillableContainers(String partition) {
return csContext.getPreemptionManager().getKillableContainers(queueName,
partition);
}
}

View File

@ -26,6 +26,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.List;
@Private
@Unstable
public class CSAssignment {
@ -42,6 +44,7 @@ public class CSAssignment {
private boolean fulfilledReservation;
private final AssignmentInformation assignmentInformation;
private boolean increaseAllocation;
private List<RMContainer> containersToKill;
public CSAssignment(Resource resource, NodeType type) {
this(resource, type, null, null, false, false);
@ -147,4 +150,12 @@ public boolean isIncreasedAllocation() {
public void setIncreasedAllocation(boolean flag) {
increaseAllocation = flag;
}
public void setContainersToKill(List<RMContainer> containersToKill) {
this.containersToKill = containersToKill;
}
public List<RMContainer> getContainersToKill() {
return containersToKill;
}
}

View File

@ -108,6 +108,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -148,6 +150,10 @@ public class CapacityScheduler extends
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
private PreemptionManager preemptionManager = new PreemptionManager();
private volatile boolean isLazyPreemptionEnabled = false;
static final Comparator<CSQueue> nonPartitionedQueueComparator =
new Comparator<CSQueue>() {
@Override
@ -298,12 +304,11 @@ private synchronized void initScheduler(Configuration configuration) throws
initMaximumResourceCapability(this.conf.getMaximumAllocation());
this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications =
new ConcurrentHashMap<ApplicationId,
SchedulerApplication<FiCaSchedulerApp>>();
this.applications = new ConcurrentHashMap<>();
this.labelManager = rmContext.getNodeLabelManager();
authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
initializeQueues(this.conf);
this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
scheduleAsynchronously = this.conf.getScheduleAynschronously();
asyncScheduleInterval =
@ -369,6 +374,9 @@ public void serviceStop() throws Exception {
refreshMaximumAllocation(this.conf.getMaximumAllocation());
throw new IOException("Failed to re-init queues", t);
}
// update lazy preemption
this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled();
}
long getAsyncScheduleInterval() {
@ -503,6 +511,9 @@ private void initializeQueues(CapacitySchedulerConfiguration conf)
LOG.info("Initialized root queue " + root);
updatePlacementRules();
setQueueAcls(authorizer, queues);
// Notify Preemption Manager
preemptionManager.refreshQueues(null, root);
}
@Lock(CapacityScheduler.class)
@ -531,6 +542,9 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf)
labelManager.reinitializeQueueLabels(getQueueToLabels());
setQueueAcls(authorizer, queues);
// Notify Preemption Manager
preemptionManager.refreshQueues(null, root);
}
@VisibleForTesting
@ -1253,8 +1267,10 @@ RMNodeLabelsManager.NO_LABEL, getClusterResource())),
// Try to schedule more if there are no reservations to fulfill
if (node.getReservedContainer() == null) {
if (calculator.computeAvailableContainers(node.getUnallocatedResource(),
minimumAllocation) > 0) {
if (calculator.computeAvailableContainers(Resources
.add(node.getUnallocatedResource(), node.getTotalKillableResources()),
minimumAllocation) > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getUnallocatedResource());
@ -1263,10 +1279,8 @@ RMNodeLabelsManager.NO_LABEL, getClusterResource())),
assignment = root.assignContainers(
getClusterResource(),
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, getClusterResource())),
node.getPartition(), getClusterResource())),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
if (Resources.greaterThan(calculator, getClusterResource(),
assignment.getResource(), Resources.none())) {
@ -1436,11 +1450,20 @@ public void handle(SchedulerEvent event) {
markContainerForPreemption(aid, containerToBePreempted);
}
break;
case KILL_PREEMPTED_CONTAINER:
case MARK_CONTAINER_FOR_KILLABLE:
{
ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event;
RMContainer containerToBeKilled = killContainerEvent.getContainer();
killPreemptedContainer(containerToBeKilled);
ContainerPreemptEvent containerKillableEvent = (ContainerPreemptEvent)event;
RMContainer killableContainer = containerKillableEvent.getContainer();
markContainerForKillable(killableContainer);
}
break;
case MARK_CONTAINER_FOR_NONKILLABLE:
{
if (isLazyPreemptionEnabled) {
ContainerPreemptEvent cancelKillContainerEvent =
(ContainerPreemptEvent) event;
markContainerForNonKillable(cancelKillContainerEvent.getContainer());
}
}
break;
default:
@ -1548,14 +1571,14 @@ private void rollbackContainerResource(
protected void completedContainerInternal(
RMContainer rmContainer, ContainerStatus containerStatus,
RMContainerEventType event) {
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
// Get the application for the finished container
FiCaSchedulerApp application =
getCurrentAttemptForContainer(container.getId());
ApplicationId appId =
container.getId().getApplicationAttemptId().getApplicationId();
containerId.getApplicationAttemptId().getApplicationId();
if (application == null) {
LOG.info("Container " + container + " of" + " finished application "
+ appId + " completed with event " + event);
@ -1569,15 +1592,6 @@ protected void completedContainerInternal(
LeafQueue queue = (LeafQueue)application.getQueue();
queue.completedContainer(getClusterResource(), application, node,
rmContainer, containerStatus, event, null, true);
if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
schedulerHealth.updatePreemption(Time.now(), container.getNodeId(),
container.getId(), queue.getQueuePath());
schedulerHealth.updateSchedulerPreemptionCounts(1);
} else {
schedulerHealth.updateRelease(lastNodeUpdateTime, container.getNodeId(),
container.getId(), queue.getQueuePath());
}
}
@Override
@ -1613,7 +1627,7 @@ public FiCaSchedulerApp getApplicationAttempt(
ApplicationAttemptId applicationAttemptId) {
return super.getApplicationAttempt(applicationAttemptId);
}
@Lock(Lock.NoLock.class)
public FiCaSchedulerNode getNode(NodeId nodeId) {
return nodeTracker.getNode(nodeId);
@ -1654,15 +1668,60 @@ public void markContainerForPreemption(ApplicationAttemptId aid,
}
}
@Override
public void killPreemptedContainer(RMContainer cont) {
public synchronized void markContainerForKillable(
RMContainer killableContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug(SchedulerEventType.KILL_PREEMPTED_CONTAINER + ": container"
+ cont.toString());
LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container"
+ killableContainer.toString());
}
if (!isLazyPreemptionEnabled) {
super.completedContainer(killableContainer, SchedulerUtils
.createPreemptedContainerStatus(killableContainer.getContainerId(),
SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL);
} else {
FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode(
killableContainer.getAllocatedNode());
FiCaSchedulerApp application = getCurrentAttemptForContainer(
killableContainer.getContainerId());
node.markContainerToKillable(killableContainer.getContainerId());
// notify PreemptionManager
// Get the application for the finished container
if (null != application) {
String leafQueueName = application.getCSLeafQueue().getQueueName();
getPreemptionManager().addKillableContainer(
new KillableContainer(killableContainer, node.getPartition(),
leafQueueName));
} }
}
private synchronized void markContainerForNonKillable(
RMContainer nonKillableContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug(
SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container"
+ nonKillableContainer.toString());
}
FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode(
nonKillableContainer.getAllocatedNode());
FiCaSchedulerApp application = getCurrentAttemptForContainer(
nonKillableContainer.getContainerId());
node.markContainerToNonKillable(nonKillableContainer.getContainerId());
// notify PreemptionManager
// Get the application for the finished container
if (null != application) {
String leafQueueName = application.getCSLeafQueue().getQueueName();
getPreemptionManager().removeKillableContainer(
new KillableContainer(nonKillableContainer, node.getPartition(),
leafQueueName));
}
super.completedContainer(cont, SchedulerUtils
.createPreemptedContainerStatus(cont.getContainerId(),
SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL);
}
@Override
@ -1945,6 +2004,7 @@ public Set<String> getPlanQueues() {
return ret;
}
@Override
public SchedulerHealth getSchedulerHealth() {
return this.schedulerHealth;
}
@ -1953,6 +2013,11 @@ private void setLastNodeUpdateTime(long time) {
this.lastNodeUpdateTime = time;
}
@Override
public long getLastNodeUpdateTime() {
return lastNodeUpdateTime;
}
@Override
public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
String user, String queueName, ApplicationId applicationId)
@ -2054,4 +2119,9 @@ public void updateApplicationPriority(Priority newPriority,
+ rmApp.getQueue() + " for application: " + applicationId
+ " for the user: " + rmApp.getUser());
}
@Override
public PreemptionManager getPreemptionManager() {
return preemptionManager;
}
}

View File

@ -257,6 +257,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String RESERVATION_ENFORCEMENT_WINDOW =
"reservation-enforcement-window";
@Private
public static final String LAZY_PREEMPTION_ENALBED = PREFIX + "lazy-preemption-enabled";
@Private
public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false;
public CapacitySchedulerConfiguration() {
this(new Configuration());
}
@ -1007,7 +1013,11 @@ public void setOrderingPolicy(String queue, String policy) {
@VisibleForTesting
public void setOrderingPolicyParameter(String queue,
String parameterKey, String parameterValue) {
set(getQueuePrefix(queue) + ORDERING_POLICY + "."
+ parameterKey, parameterValue);
set(getQueuePrefix(queue) + ORDERING_POLICY + "." + parameterKey,
parameterValue);
}
public boolean getLazyPreemptionEnabled() {
return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED);
}
}

View File

@ -18,17 +18,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.Comparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import java.util.Comparator;
/**
* Read-only interface to {@link CapacityScheduler} context.
*/
@ -61,4 +64,12 @@ public interface CapacitySchedulerContext {
PartitionedQueueComparator getPartitionedQueueComparator();
FiCaSchedulerNode getNode(NodeId nodeId);
FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId attemptId);
PreemptionManager getPreemptionManager();
SchedulerHealth getSchedulerHealth();
long getLastNodeUpdateTime();
}

View File

@ -37,9 +37,11 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@ -63,7 +65,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
@ -823,6 +827,40 @@ private void handleExcessReservedContainer(Resource clusterResource,
assignment.setExcessReservation(null);
}
}
private void killToPreemptContainers(Resource clusterResource,
FiCaSchedulerNode node,
CSAssignment assignment) {
if (assignment.getContainersToKill() != null) {
StringBuilder sb = new StringBuilder("Killing containers: [");
for (RMContainer c : assignment.getContainersToKill()) {
FiCaSchedulerApp application = csContext.getApplicationAttempt(
c.getApplicationAttemptId());
LeafQueue q = application.getCSLeafQueue();
q.completedContainer(clusterResource, application, node, c, SchedulerUtils
.createPreemptedContainerStatus(c.getContainerId(),
SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL,
null, false);
sb.append("(container=" + c.getContainerId() + " resource=" + c
.getAllocatedResource() + ")");
}
sb.append("] for container=" + assignment.getAssignmentInformation()
.getFirstAllocatedOrReservedContainerId() + " resource=" + assignment
.getResource());
LOG.info(sb.toString());
}
}
private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) {
// Set preemption-allowed:
// For leaf queue, only under-utilized queue is allowed to preempt resources from other queues
float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition);
float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
}
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
@ -835,6 +873,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
+ " #applications=" + orderingPolicy.getNumSchedulableEntities());
}
setPreemptionAllowed(currentResourceLimits, node.getPartition());
// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
@ -846,6 +886,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
currentResourceLimits, schedulingMode, reservedContainer);
handleExcessReservedContainer(clusterResource, assignment, node,
application);
killToPreemptContainers(clusterResource, node, assignment);
return assignment;
}
}
@ -907,6 +948,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
handleExcessReservedContainer(clusterResource, assignment, node,
application);
killToPreemptContainers(clusterResource, node, assignment);
if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
Resources.none())) {
@ -1210,11 +1252,34 @@ public void unreserveIncreasedContainer(Resource clusterResource,
}
}
private void updateSchedulerHealthForCompletedContainer(
RMContainer rmContainer, ContainerStatus containerStatus) {
// Update SchedulerHealth for released / preempted container
SchedulerHealth schedulerHealth = csContext.getSchedulerHealth();
if (null == schedulerHealth) {
// Only do update if we have schedulerHealth
return;
}
if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
schedulerHealth.updatePreemption(Time.now(), rmContainer.getAllocatedNode(),
rmContainer.getContainerId(), getQueuePath());
schedulerHealth.updateSchedulerPreemptionCounts(1);
} else {
schedulerHealth.updateRelease(csContext.getLastNodeUpdateTime(),
rmContainer.getAllocatedNode(), rmContainer.getContainerId(),
getQueuePath());
}
}
@Override
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue,
boolean sortQueues) {
// Update SchedulerHealth for released / preempted container
updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus);
if (application != null) {
// unreserve container increase request if it previously reserved.
if (rmContainer.hasIncreaseReservation()) {
@ -1265,6 +1330,10 @@ public void completedContainer(Resource clusterResource,
rmContainer, null, event, this, sortQueues);
}
}
// Notify PreemptionManager
csContext.getPreemptionManager().removeKillableContainer(
new KillableContainer(rmContainer, node.getPartition(), queueName));
}
synchronized void allocateResource(Resource clusterResource,

View File

@ -18,18 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -49,6 +37,7 @@
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -57,12 +46,25 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@Private
@Evolving
public class ParentQueue extends AbstractCSQueue {
@ -386,6 +388,11 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(node.getPartition())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it is not able to access partition=" + node
.getPartition());
}
return CSAssignment.NULL_ASSIGNMENT;
}
@ -431,7 +438,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
resourceCalculator, clusterResource,
assignedToChild.getResource(), Resources.none())) {
// Track resource utilization for the parent-queue
super.allocateResource(clusterResource, assignedToChild.getResource(),
allocateResource(clusterResource, assignedToChild.getResource(),
node.getPartition(), assignedToChild.isIncreasedAllocation());
// Track resource utilization in this pass of the scheduler
@ -494,29 +501,38 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
}
private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
return (node.getReservedContainer() == null) &&
Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
node.getUnallocatedResource(), minimumAllocation);
// Two conditions need to meet when trying to allocate:
// 1) Node doesn't have reserved container
// 2) Node's available-resource + killable-resource should > 0
return node.getReservedContainer() == null && Resources.greaterThanOrEqual(
resourceCalculator, clusterResource, Resources
.add(node.getUnallocatedResource(), node.getTotalKillableResources()),
minimumAllocation);
}
private ResourceLimits getResourceLimitsOfChild(CSQueue child,
Resource clusterResource, ResourceLimits parentLimits) {
Resource clusterResource, ResourceLimits parentLimits,
String nodePartition) {
// Set resource-limit of a given child, child.limit =
// min(my.limit - my.used + child.used, child.max)
// Parent available resource = parent-limit - parent-used-resource
Resource parentMaxAvailableResource =
Resources.subtract(parentLimits.getLimit(), getUsedResources());
Resource parentMaxAvailableResource = Resources.subtract(
parentLimits.getLimit(), queueUsage.getUsed(nodePartition));
// Deduct killable from used
Resources.addTo(parentMaxAvailableResource,
getTotalKillableResource(nodePartition));
// Child's limit = parent-available-resource + child-used
Resource childLimit =
Resources.add(parentMaxAvailableResource, child.getUsedResources());
Resource childLimit = Resources.add(parentMaxAvailableResource,
child.getQueueResourceUsage().getUsed(nodePartition));
// Get child's max resource
Resource childConfiguredMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
child.getAbsoluteMaximumCapacity(), minimumAllocation);
Resource childConfiguredMaxResource = Resources.multiplyAndNormalizeDown(
resourceCalculator,
labelManager.getResourceByLabel(nodePartition, clusterResource),
child.getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition),
minimumAllocation);
// Child's limit should be capped by child configured max resource
childLimit =
@ -568,7 +584,7 @@ private synchronized CSAssignment assignContainersToChildQueues(
// Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, cluster, limits);
getResourceLimitsOfChild(childQueue, cluster, limits, node.getPartition());
assignment = childQueue.assignContainers(cluster, node,
childLimits, schedulingMode);
@ -714,8 +730,8 @@ public synchronized void updateClusterResource(Resource clusterResource,
// Update all children
for (CSQueue childQueue : childQueues) {
// Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, clusterResource, resourceLimits);
ResourceLimits childLimits = getResourceLimitsOfChild(childQueue,
clusterResource, resourceLimits, RMNodeLabelsManager.NO_LABEL);
childQueue.updateClusterResource(clusterResource, childLimits);
}
@ -738,8 +754,8 @@ public void recoverContainer(Resource clusterResource,
synchronized (this) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
.getResource(), node.getPartition(), false);
allocateResource(clusterResource,
rmContainer.getContainer().getResource(), node.getPartition(), false);
}
if (parent != null) {
parent.recoverContainer(clusterResource, attempt, rmContainer);
@ -766,7 +782,7 @@ public void attachContainer(Resource clusterResource,
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
allocateResource(clusterResource, rmContainer.getContainer()
.getResource(), node.getPartition(), false);
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
@ -802,4 +818,79 @@ public void detachContainer(Resource clusterResource,
public synchronized int getNumApplications() {
return numApplications;
}
synchronized void allocateResource(Resource clusterResource,
Resource resource, String nodePartition, boolean changeContainerResource) {
super.allocateResource(clusterResource, resource, nodePartition,
changeContainerResource);
/**
* check if we need to kill (killable) containers if maximum resource violated.
* Doing this because we will deduct killable resource when going from root.
* For example:
* <pre>
* Root
* / \
* a b
* / \
* a1 a2
* </pre>
*
* a: max=10G, used=10G, killable=2G
* a1: used=8G, killable=2G
* a2: used=2G, pending=2G, killable=0G
*
* When we get queue-a to allocate resource, even if queue-a
* reaches its max resource, we deduct its used by killable, so we can allocate
* at most 2G resources. ResourceLimits passed down to a2 has headroom set to 2G.
*
* If scheduler finds a 2G available resource in existing cluster, and assigns it
* to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G
*
* When this happens, we have to preempt killable container (on same or different
* nodes) of parent queue to avoid violating parent's max resource.
*/
if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition)
< getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) {
killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource);
}
}
private void killContainersToEnforceMaxQueueCapacity(String partition,
Resource clusterResource) {
Iterator<RMContainer> killableContainerIter = getKillableContainers(
partition);
if (!killableContainerIter.hasNext()) {
return;
}
Resource partitionResource = labelManager.getResourceByLabel(partition,
null);
Resource maxResource = Resources.multiply(partitionResource,
getQueueCapacities().getAbsoluteMaximumCapacity(partition));
while (Resources.greaterThan(resourceCalculator, partitionResource,
queueUsage.getUsed(partition), maxResource)) {
RMContainer toKillContainer = killableContainerIter.next();
FiCaSchedulerApp attempt = csContext.getApplicationAttempt(
toKillContainer.getContainerId().getApplicationAttemptId());
FiCaSchedulerNode node = csContext.getNode(
toKillContainer.getAllocatedNode());
if (null != attempt && null != node) {
LeafQueue lq = attempt.getCSLeafQueue();
lq.completedContainer(clusterResource, attempt, node, toKillContainer,
SchedulerUtils.createPreemptedContainerStatus(
toKillContainer.getContainerId(),
SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL,
null, false);
LOG.info("Killed container=" + toKillContainer.getContainerId()
+ " from queue=" + lq.getQueueName() + " to make queue=" + this
.getQueueName() + "'s max-capacity enforced");
}
if (!killableContainerIter.hasNext()) {
break;
}
}
}
}

View File

@ -108,6 +108,8 @@ protected CSAssignment getCSAssignmentFromAllocateResult(
assignment.setFulfilledReservation(true);
}
}
assignment.setContainersToKill(result.getToKillContainers());
}
return assignment;

View File

@ -19,11 +19,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.List;
public class ContainerAllocation {
/**
* Skip the locality (e.g. node-local, rack-local, any), and look at other
@ -56,6 +59,7 @@ public class ContainerAllocation {
NodeType containerNodeType = NodeType.NODE_LOCAL;
NodeType requestNodeType = NodeType.NODE_LOCAL;
Container updatedContainer;
private List<RMContainer> toKillContainers;
public ContainerAllocation(RMContainer containerToBeUnreserved,
Resource resourceToBeAllocated, AllocationState state) {
@ -86,4 +90,12 @@ public NodeType getContainerNodeType() {
public Container getUpdatedContainer() {
return updatedContainer;
}
public void setToKillContainers(List<RMContainer> toKillContainers) {
this.toKillContainers = toKillContainers;
}
public List<RMContainer> getToKillContainers() {
return toKillContainers;
}
}

View File

@ -42,6 +42,9 @@
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.List;
/**
* Allocate normal (new) containers, considers locality/label, etc. Using
* delayed scheduling mechanism to get better locality allocation.
@ -435,9 +438,6 @@ private ContainerAllocation assignContainer(Resource clusterResource,
return ContainerAllocation.LOCALITY_SKIPPED;
}
assert Resources.greaterThan(
rc, clusterResource, available, Resources.none());
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
priority, capability);
@ -460,6 +460,29 @@ private ContainerAllocation assignContainer(Resource clusterResource,
boolean reservationsContinueLooking =
application.getCSLeafQueue().getReservationContinueLooking();
// Check if we need to kill some containers to allocate this one
List<RMContainer> toKillContainers = null;
if (availableContainers == 0 && currentResoureLimits.isAllowPreemption()) {
Resource availableAndKillable = Resources.clone(available);
for (RMContainer killableContainer : node
.getKillableContainers().values()) {
if (null == toKillContainers) {
toKillContainers = new ArrayList<>();
}
toKillContainers.add(killableContainer);
Resources.addTo(availableAndKillable,
killableContainer.getAllocatedResource());
if (Resources.fitsIn(rc,
clusterResource,
capability,
availableAndKillable)) {
// Stop if we find enough spaces
availableContainers = 1;
break;
}
}
}
if (availableContainers > 0) {
// Allocate...
// We will only do continuous reservation when this is not allocated from
@ -499,12 +522,12 @@ private ContainerAllocation assignContainer(Resource clusterResource,
new ContainerAllocation(unreservedContainer, request.getCapability(),
AllocationState.ALLOCATED);
result.containerNodeType = type;
result.setToKillContainers(toKillContainers);
return result;
} else {
// if we are allowed to allocate but this node doesn't have space, reserve
// it or if this was an already a reserved container, reserve it again
if (shouldAllocOrReserveNewContainer || rmContainer != null) {
if (reservationsContinueLooking && rmContainer == null) {
// we could possibly ignoring queue capacity or user limits when
// reservationsContinueLooking is set. Make sure we didn't need to
@ -522,6 +545,7 @@ private ContainerAllocation assignContainer(Resource clusterResource,
new ContainerAllocation(null, request.getCapability(),
AllocationState.RESERVED);
result.containerNodeType = type;
result.setToKillContainers(null);
return result;
}
// Skip the locality request
@ -613,8 +637,7 @@ private ContainerAllocation handleNewContainerAllocation(
}
ContainerAllocation doAllocation(ContainerAllocation allocationResult,
Resource clusterResource, FiCaSchedulerNode node,
SchedulingMode schedulingMode, Priority priority,
FiCaSchedulerNode node, Priority priority,
RMContainer reservedContainer) {
// Create the container if necessary
Container container =
@ -678,9 +701,7 @@ private ContainerAllocation allocate(Resource clusterResource,
if (AllocationState.ALLOCATED == result.state
|| AllocationState.RESERVED == result.state) {
result =
doAllocation(result, clusterResource, node, schedulingMode, priority,
reservedContainer);
result = doAllocation(result, node, priority, reservedContainer);
}
return result;

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
public class KillableContainer {
RMContainer container;
String partition;
String leafQueueName;
public KillableContainer(RMContainer container, String partition, String leafQueueName) {
this.container = container;
this.partition = partition;
this.leafQueueName = leafQueueName;
}
public RMContainer getRMContainer() {
return this.container;
}
public String getNodePartition() {
return this.partition;
}
public String getLeafQueueName() {
return this.leafQueueName;
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
public class PreemptableQueue {
// Partition -> killable resources and containers
private Map<String, Resource> totalKillableResources = new HashMap<>();
private Map<String, Map<ContainerId, RMContainer>> killableContainers =
new HashMap<>();
private PreemptableQueue parent;
public PreemptableQueue(PreemptableQueue parent) {
this.parent = parent;
}
public PreemptableQueue(Map<String, Resource> totalKillableResources,
Map<String, Map<ContainerId, RMContainer>> killableContainers) {
this.totalKillableResources = totalKillableResources;
this.killableContainers = killableContainers;
}
void addKillableContainer(KillableContainer container) {
String partition = container.getNodePartition();
if (!totalKillableResources.containsKey(partition)) {
totalKillableResources.put(partition, Resources.createResource(0));
killableContainers.put(partition,
new ConcurrentSkipListMap<ContainerId, RMContainer>());
}
RMContainer c = container.getRMContainer();
Resources.addTo(totalKillableResources.get(partition),
c.getAllocatedResource());
killableContainers.get(partition).put(c.getContainerId(), c);
if (null != parent) {
parent.addKillableContainer(container);
}
}
void removeKillableContainer(KillableContainer container) {
String partition = container.getNodePartition();
Map<ContainerId, RMContainer> partitionKillableContainers =
killableContainers.get(partition);
if (partitionKillableContainers != null) {
RMContainer rmContainer = partitionKillableContainers.remove(
container.getRMContainer().getContainerId());
if (null != rmContainer) {
Resources.subtractFrom(totalKillableResources.get(partition),
rmContainer.getAllocatedResource());
}
}
if (null != parent) {
parent.removeKillableContainer(container);
}
}
public Resource getKillableResource(String partition) {
Resource res = totalKillableResources.get(partition);
return res == null ? Resources.none() : res;
}
@SuppressWarnings("unchecked")
public Map<ContainerId, RMContainer> getKillableContainers(String partition) {
Map<ContainerId, RMContainer> map = killableContainers.get(partition);
return map == null ? Collections.EMPTY_MAP : map;
}
public Map<String, Map<ContainerId, RMContainer>> getKillableContainers() {
return killableContainers;
}
Map<String, Resource> getTotalKillableResources() {
return totalKillableResources;
}
}

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PreemptionManager {
private ReentrantReadWriteLock.ReadLock readLock;
private ReentrantReadWriteLock.WriteLock writeLock;
private Map<String, PreemptableQueue> entities = new HashMap<>();
public PreemptionManager() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
public void refreshQueues(CSQueue parent, CSQueue current) {
try {
writeLock.lock();
PreemptableQueue parentEntity = null;
if (parent != null) {
parentEntity = entities.get(parent.getQueueName());
}
if (!entities.containsKey(current.getQueueName())) {
entities.put(current.getQueueName(),
new PreemptableQueue(parentEntity));
}
if (current.getChildQueues() != null) {
for (CSQueue child : current.getChildQueues()) {
refreshQueues(current, child);
}
}
}
finally {
writeLock.unlock();
}
}
public void addKillableContainer(KillableContainer container) {
try {
writeLock.lock();
PreemptableQueue entity = entities.get(container.getLeafQueueName());
if (null != entity) {
entity.addKillableContainer(container);
}
}
finally {
writeLock.unlock();
}
}
public void removeKillableContainer(KillableContainer container) {
try {
writeLock.lock();
PreemptableQueue entity = entities.get(container.getLeafQueueName());
if (null != entity) {
entity.removeKillableContainer(container);
}
}
finally {
writeLock.unlock();
}
}
public void moveKillableContainer(KillableContainer oldContainer,
KillableContainer newContainer) {
// TODO, will be called when partition of the node changed OR
// container moved to different queue
}
public void updateKillableContainerResource(KillableContainer container,
Resource oldResource, Resource newResource) {
// TODO, will be called when container's resource changed
}
@VisibleForTesting
public Map<ContainerId, RMContainer> getKillableContainersMap(
String queueName, String partition) {
try {
readLock.lock();
PreemptableQueue entity = entities.get(queueName);
if (entity != null) {
Map<ContainerId, RMContainer> containers =
entity.getKillableContainers().get(partition);
if (containers != null) {
return containers;
}
}
return Collections.emptyMap();
}
finally {
readLock.unlock();
}
}
public Iterator<RMContainer> getKillableContainers(String queueName,
String partition) {
return getKillableContainersMap(queueName, partition).values().iterator();
}
public Resource getKillableResource(String queueName, String partition) {
try {
readLock.lock();
PreemptableQueue entity = entities.get(queueName);
if (entity != null) {
Resource res = entity.getTotalKillableResources().get(partition);
if (res == null || res.equals(Resources.none())) {
return Resources.none();
}
return Resources.clone(res);
}
return Resources.none();
}
finally {
readLock.unlock();
}
}
public Map<String, PreemptableQueue> getShallowCopyOfPreemptableEntities() {
try {
readLock.lock();
Map<String, PreemptableQueue> map = new HashMap<>();
for (Map.Entry<String, PreemptableQueue> entry : entities.entrySet()) {
String key = entry.getKey();
PreemptableQueue entity = entry.getValue();
map.put(key, new PreemptableQueue(
new HashMap<>(entity.getTotalKillableResources()),
new HashMap<>(entity.getKillableContainers())));
}
return map;
} finally {
readLock.unlock();
}
}
}

View File

@ -120,9 +120,9 @@ public List<AssignmentDetails> getReservationDetails() {
}
private ContainerId getFirstContainerIdFromOperation(Operation op) {
if (null != operationDetails.get(Operation.ALLOCATION)) {
if (null != operationDetails.get(op)) {
List<AssignmentDetails> assignDetails =
operationDetails.get(Operation.ALLOCATION);
operationDetails.get(op);
if (!assignDetails.isEmpty()) {
return assignDetails.get(0).containerId;
}
@ -131,7 +131,7 @@ private ContainerId getFirstContainerIdFromOperation(Operation op) {
}
public ContainerId getFirstAllocatedOrReservedContainerId() {
ContainerId containerId = null;
ContainerId containerId;
containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION);
if (null != containerId) {
return containerId;

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@ -94,6 +95,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
* to hold the message if its app doesn't not get container from a node
*/
private String appSkipNodeDiagnostics;
private CapacitySchedulerContext capacitySchedulerContext;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
@ -138,28 +140,30 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
}
containerAllocator = new ContainerAllocator(this, rc, rmContext);
if (scheduler instanceof CapacityScheduler) {
capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
}
}
synchronized public boolean containerCompleted(RMContainer rmContainer,
public synchronized boolean containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event,
String partition) {
ContainerId containerId = rmContainer.getContainerId();
// Remove from the list of containers
if (null == liveContainers.remove(rmContainer.getContainerId())) {
if (null == liveContainers.remove(containerId)) {
return false;
}
// Remove from the list of newly allocated containers if found
newlyAllocatedContainers.remove(rmContainer);
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(containerId, containerStatus, event));
containersToPreempt.remove(rmContainer.getContainerId());
containersToPreempt.remove(containerId);
RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
@ -176,7 +180,7 @@ synchronized public boolean containerCompleted(RMContainer rmContainer,
return true;
}
synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node,
Priority priority, ResourceRequest request,
Container container) {
@ -200,7 +204,9 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);
ContainerId containerId = container.getId();
liveContainers.put(containerId, rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
@ -213,17 +219,17 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
// Inform the container
rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START));
new RMContainerEvent(containerId, RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId="
+ container.getId().getApplicationAttemptId()
+ " container=" + container.getId() + " host="
+ containerId.getApplicationAttemptId()
+ " container=" + containerId + " host="
+ container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), container.getId());
getApplicationId(), containerId);
return rmContainer;
}

View File

@ -18,22 +18,29 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class FiCaSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
private Map<ContainerId, RMContainer> killableContainers = new HashMap<>();
private Resource totalKillableResources = Resource.newInstance(0, 0);
public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName,
Set<String> nodeLabels) {
@ -92,7 +99,6 @@ public synchronized void reserveResource(
@Override
public synchronized void unreserveResource(
SchedulerApplicationAttempt application) {
// adding NP checks as this can now be called for preemption
if (getReservedContainer() != null
&& getReservedContainer().getContainer() != null
@ -115,4 +121,55 @@ && getReservedContainer().getContainer().getId()
}
setReservedContainer(null);
}
// According to decisions from preemption policy, mark the container to killable
public synchronized void markContainerToKillable(ContainerId containerId) {
RMContainer c = launchedContainers.get(containerId);
if (c != null && !killableContainers.containsKey(containerId)) {
killableContainers.put(containerId, c);
Resources.addTo(totalKillableResources, c.getAllocatedResource());
}
}
// According to decisions from preemption policy, mark the container to
// non-killable
public synchronized void markContainerToNonKillable(ContainerId containerId) {
RMContainer c = launchedContainers.get(containerId);
if (c != null && killableContainers.containsKey(containerId)) {
killableContainers.remove(containerId);
Resources.subtractFrom(totalKillableResources, c.getAllocatedResource());
}
}
@Override
protected synchronized void updateResource(
Container container) {
super.updateResource(container);
if (killableContainers.containsKey(container.getId())) {
Resources.subtractFrom(totalKillableResources, container.getResource());
killableContainers.remove(container.getId());
}
}
@Override
protected synchronized void changeContainerResource(ContainerId containerId,
Resource deltaResource, boolean increase) {
super.changeContainerResource(containerId, deltaResource, increase);
if (killableContainers.containsKey(containerId)) {
if (increase) {
Resources.addTo(totalKillableResources, deltaResource);
} else {
Resources.subtractFrom(totalKillableResources, deltaResource);
}
}
}
public synchronized Resource getTotalKillableResources() {
return totalKillableResources;
}
public synchronized Map<ContainerId, RMContainer> getKillableContainers() {
return killableContainers;
}
}

View File

@ -38,10 +38,15 @@ public enum SchedulerEventType {
// Source: ContainerAllocationExpirer
CONTAINER_EXPIRED,
// Source: SchedulingEditPolicy
/* Source: SchedulingEditPolicy */
KILL_RESERVED_CONTAINER,
MARK_CONTAINER_FOR_PREEMPTION, // Mark a container for preemption
// in the near future
KILL_PREEMPTED_CONTAINER // Kill a container previously marked for
// preemption
// Mark a container for preemption
MARK_CONTAINER_FOR_PREEMPTION,
// Mark a for-preemption container killable
MARK_CONTAINER_FOR_KILLABLE,
// Cancel a killable container
MARK_CONTAINER_FOR_NONKILLABLE
}

View File

@ -59,7 +59,7 @@ public void testSchedulerEventDispatcherForPreemptionEvents() {
rmDispatcher.getEventHandler().handle(event1);
ContainerPreemptEvent event2 =
new ContainerPreemptEvent(appAttemptId, container,
SchedulerEventType.KILL_PREEMPTED_CONTAINER);
SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE);
rmDispatcher.getEventHandler().handle(event2);
ContainerPreemptEvent event3 =
new ContainerPreemptEvent(appAttemptId, container,
@ -70,7 +70,7 @@ public void testSchedulerEventDispatcherForPreemptionEvents() {
verify(sched, times(3)).handle(any(SchedulerEvent.class));
verify(sched).killReservedContainer(container);
verify(sched).markContainerForPreemption(appAttemptId, container);
verify(sched).killPreemptedContainer(container);
verify(sched).markContainerForKillable(container);
} catch (InterruptedException e) {
Assert.fail();
} finally {

View File

@ -2352,7 +2352,7 @@ public void testRMRestartAfterPreemption() throws Exception {
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
.get(app0.getApplicationId()).getCurrentAppAttempt();
// kill app0-attempt
cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(
cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(
app0.getCurrentAppAttempt().getMasterContainer().getId()));
am0.waitForState(RMAppAttemptState.FAILED);
}

View File

@ -63,7 +63,6 @@
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
@ -566,7 +565,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
ContainerId amContainer =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// Preempt the first attempt;
scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer));
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
@ -582,7 +581,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
// Preempt the second attempt.
ContainerId amContainer2 =
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer2));
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2));
am2.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
@ -677,7 +676,7 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception {
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// Forcibly preempt the am container;
scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer));
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());

View File

@ -23,7 +23,7 @@
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_PREEMPTED_CONTAINER;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -75,6 +75,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@ -167,6 +168,7 @@ public void setup() {
when(mCS.getConfiguration()).thenReturn(schedConf);
rmContext = mock(RMContext.class);
when(mCS.getRMContext()).thenReturn(rmContext);
when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager());
when(rmContext.getNodeLabelManager()).thenReturn(lm);
mDisp = mock(EventHandler.class);
Dispatcher disp = mock(Dispatcher.class);
@ -289,7 +291,7 @@ public void testExpireKill() {
List<ContainerPreemptEvent> events = evtCaptor.getAllValues();
for (ContainerPreemptEvent e : events.subList(20, 20)) {
assertEquals(appC, e.getAppId());
assertEquals(KILL_PREEMPTED_CONTAINER, e.getType());
assertEquals(MARK_CONTAINER_FOR_KILLABLE, e.getType());
}
}

View File

@ -67,6 +67,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
@ -123,6 +124,7 @@ public void setup() {
mClock = mock(Clock.class);
cs = mock(CapacityScheduler.class);
when(cs.getResourceCalculator()).thenReturn(rc);
when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
nlm = mock(RMNodeLabelsManager.class);
mDisp = mock(EventHandler.class);

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -264,6 +265,7 @@ public void testLimitsComputation() throws Exception {
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
// Say cluster has 100 nodes of 16G each
Resource clusterResource =

View File

@ -205,7 +205,7 @@ public void testApplicationPriorityAllocation() throws Exception {
if (++counter > 2) {
break;
}
cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
}
// check node report, 12 GB used and 4 GB available
@ -512,7 +512,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority()
if (++counter > 2) {
break;
}
cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId()));
iterator.remove();
}
@ -542,7 +542,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority()
if (++counter > 1) {
break;
}
cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId()));
iterator.remove();
}

View File

@ -1188,7 +1188,7 @@ public void testPreemptionInfo() throws Exception {
// kill the 3 containers
for (Container c : allocatedContainers) {
cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
}
// check values
@ -1197,7 +1197,7 @@ public void testPreemptionInfo() throws Exception {
Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
// kill app0-attempt0 AM container
cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(app0
cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0
.getCurrentAppAttempt().getMasterContainer().getId()));
// wait for app0 failed
@ -1220,7 +1220,7 @@ public void testPreemptionInfo() throws Exception {
allocatedContainers =
am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
for (Container c : allocatedContainers) {
cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
}
// check values
@ -1269,7 +1269,7 @@ public void testRecoverRequestAfterPreemption() throws Exception {
}
// Call killContainer to preempt the container
cs.killPreemptedContainer(rmContainer);
cs.markContainerForKillable(rmContainer);
Assert.assertEquals(3, requests.size());
for (ResourceRequest request : requests) {

View File

@ -0,0 +1,677 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestCapacitySchedulerPreemption {
private static final Log LOG = LogFactory.getLog(
TestCapacitySchedulerPreemption.class);
private final int GB = 1024;
private Configuration conf;
RMNodeLabelsManager mgr;
Clock clock;
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
// Set preemption related configurations
conf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL,
0);
conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
true);
conf.setFloat(
ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f);
conf.setFloat(
ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f);
mgr = new NullRMNodeLabelsManager();
mgr.init(this.conf);
clock = mock(Clock.class);
when(clock.getTime()).thenReturn(0L);
}
private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
RMActiveServices activeServices = rm.getRMActiveService();
SchedulingMonitor mon = null;
for (Service service : activeServices.getServices()) {
if (service instanceof SchedulingMonitor) {
mon = (SchedulingMonitor) service;
break;
}
}
if (mon != null) {
return mon.getSchedulingEditPolicy();
}
return null;
}
@Test (timeout = 60000)
public void testSimplePreemption() throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* 1) Two nodes in the cluster, each of them has 4G.
*
* 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
* more resource available.
*
* 3) app2 submit to queue-c, ask for one 1G container (for AM)
*
* Now the cluster is fulfilled.
*
* 4) app2 asks for another 1G container, system will preempt one container
* from app1, and app2 will receive the preempted container
*/
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
// Do allocation 3 times for node1/node2
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 7 containers now, and no available resource for cluster
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
// Submit app2 to queue-c and asks for a 1G container for AM
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// NM1/NM2 has available resource = 0G
Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
.getUnallocatedResource().getMemory());
// AM asks for a 1 * GB container
am2.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(1), ResourceRequest.ANY,
Resources.createResource(1 * GB), 1)), null);
// Get edit policy and do one update
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
// Call edit schedule twice, and check if one container from app1 marked
// to be "killable"
editPolicy.editSchedule();
editPolicy.editSchedule();
PreemptionManager pm = cs.getPreemptionManager();
Map<ContainerId, RMContainer> killableContainers =
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
Assert.assertEquals(1, killableContainers.size());
Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
.getApplicationAttemptId(), am1.getApplicationAttemptId());
// Call CS.handle once to see if container preempted
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
am2.getApplicationAttemptId());
// App1 has 6 containers, and app2 has 2 containers
Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
rm1.close();
}
@Test (timeout = 60000)
public void testPreemptionConsidersNodeLocalityDelay()
throws Exception {
/**
* Test case: same as testSimplePreemption steps 1-3.
*
* Step 4: app2 asks for 1G container with locality specified, so it needs
* to wait for missed-opportunity before get scheduled.
* Check if system waits missed-opportunity before finish killable container
*/
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
// Do allocation 3 times for node1/node2
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 7 containers now, and no available resource for cluster
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
// Submit app2 to queue-c and asks for a 1G container for AM
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// NM1/NM2 has available resource = 0G
Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
.getUnallocatedResource().getMemory());
// AM asks for a 1 * GB container with unknown host and unknown rack
am2.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(1), ResourceRequest.ANY,
Resources.createResource(1 * GB), 1), ResourceRequest
.newInstance(Priority.newInstance(1), "unknownhost",
Resources.createResource(1 * GB), 1), ResourceRequest
.newInstance(Priority.newInstance(1), "/default-rack",
Resources.createResource(1 * GB), 1)), null);
// Get edit policy and do one update
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
// Call edit schedule twice, and check if one container from app1 marked
// to be "killable"
editPolicy.editSchedule();
editPolicy.editSchedule();
PreemptionManager pm = cs.getPreemptionManager();
Map<ContainerId, RMContainer> killableContainers =
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
.getApplicationAttemptId(), am1.getApplicationAttemptId());
// Call CS.handle once to see if container preempted
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
am2.getApplicationAttemptId());
// App1 has 7 containers, and app2 has 1 containers (no container preempted)
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
// Do allocation again, one container will be preempted
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// App1 has 6 containers, and app2 has 2 containers (new container allocated)
Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
rm1.close();
}
@Test (timeout = 60000)
public void testPreemptionConsidersHardNodeLocality()
throws Exception {
/**
* Test case: same as testSimplePreemption steps 1-3.
*
* Step 4: app2 asks for 1G container with hard locality specified, and
* asked host is not existed
* Confirm system doesn't preempt any container.
*/
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
// Do allocation 3 times for node1/node2
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 7 containers now, and no available resource for cluster
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
// Submit app2 to queue-c and asks for a 1G container for AM
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// NM1/NM2 has available resource = 0G
Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
.getUnallocatedResource().getMemory());
// AM asks for a 1 * GB container for h3 with hard locality,
// h3 doesn't exist in the cluster
am2.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(1), ResourceRequest.ANY,
Resources.createResource(1 * GB), 1, true), ResourceRequest
.newInstance(Priority.newInstance(1), "h3",
Resources.createResource(1 * GB), 1, false), ResourceRequest
.newInstance(Priority.newInstance(1), "/default-rack",
Resources.createResource(1 * GB), 1, false)), null);
// Get edit policy and do one update
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
// Call edit schedule twice, and check if one container from app1 marked
// to be "killable"
editPolicy.editSchedule();
editPolicy.editSchedule();
PreemptionManager pm = cs.getPreemptionManager();
Map<ContainerId, RMContainer> killableContainers =
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
.getApplicationAttemptId(), am1.getApplicationAttemptId());
// Call CS.handle once to see if container preempted
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
am2.getApplicationAttemptId());
// App1 has 7 containers, and app2 has 1 containers (no container preempted)
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
// Do allocation again, nothing will be preempted
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// App1 has 7 containers, and app2 has 1 containers (no container allocated)
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
rm1.close();
}
@Test (timeout = 60000)
public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
throws Exception {
/**
* Test case:
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
* Submit applications to two queues, one uses more than the other, so
* preemption will happen.
*
* Check:
* 1) Killable containers resources will be excluded from PCPP (no duplicated
* container added to killable list)
* 2) When more resources need to be preempted, new containers will be selected
* and killable containers will be considered
*/
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
// Do allocation 6 times for node1
for (int i = 0; i < 6; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// App1 should have 7 containers now, and no available resource for cluster
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
// Submit app2 to queue-c and asks for a 1G container for AM
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
// NM1 has available resource = 0G
Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
// Get edit policy and do one update
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
// Call edit schedule twice, and check if one container from app1 marked
// to be "killable"
editPolicy.editSchedule();
editPolicy.editSchedule();
PreemptionManager pm = cs.getPreemptionManager();
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
// Check killable containers and to-be-preempted containers in edit policy
Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
// Run edit schedule again, confirm status doesn't changed
editPolicy.editSchedule();
Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
// Save current to kill containers
Set<ContainerId> previousKillableContainers = new HashSet<>(
pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
.keySet());
// Update request resource of c from 1 to 2, so we need to preempt
// one more container
am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
// Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
// and 1 container in killable map
editPolicy.editSchedule();
Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
// Call editPolicy.editSchedule() once more, we should have 2 containers killable map
editPolicy.editSchedule();
Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
// Check if previous killable containers included by new killable containers
Map<ContainerId, RMContainer> killableContainers =
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
Assert.assertTrue(
Sets.difference(previousKillableContainers, killableContainers.keySet())
.isEmpty());
}
@Test (timeout = 60000)
public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
throws Exception {
/**
* Test case:
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
* Submit applications to two queues, one uses more than the other, so
* preemption will happen.
*
* Check:
* 1) Containers will be marked to killable
* 2) Cancel resource request
* 3) Killable containers will be cancelled from policy and scheduler
*/
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
// Do allocation 6 times for node1
for (int i = 0; i < 6; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// App1 should have 7 containers now, and no available resource for cluster
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
// Submit app2 to queue-c and asks for a 1G container for AM
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
// NM1 has available resource = 0G
Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
// Get edit policy and do one update
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
// Call edit schedule twice, and check if 3 container from app1 marked
// to be "killable"
editPolicy.editSchedule();
editPolicy.editSchedule();
PreemptionManager pm = cs.getPreemptionManager();
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
// Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
editPolicy.editSchedule();
Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
// Call editSchedule once more to make sure still nothing happens
editPolicy.editSchedule();
Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
}
@Test (timeout = 60000)
public void testPreemptionConsidersUserLimit()
throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
*
* 1) Two nodes in the cluster, each of them has 4G.
*
* 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
* more resource available.
*
* 3) app2 submit to queue-c, ask for one 1G container (for AM)
*
* Now the cluster is fulfilled.
*
* 4) app2 asks for another 1G container, system will preempt one container
* from app1, and app2 will receive the preempted container
*/
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
MockRM rm1 = new MockRM(csConf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
// Do allocation 3 times for node1/node2
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 7 containers now, and no available resource for cluster
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
// Submit app2 to queue-c and asks for a 1G container for AM
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// NM1/NM2 has available resource = 0G
Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
.getUnallocatedResource().getMemory());
// AM asks for a 1 * GB container
am2.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(1), ResourceRequest.ANY,
Resources.createResource(1 * GB), 1)), null);
// Get edit policy and do one update
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
// Call edit schedule twice, and check if no container from app1 marked
// to be "killable"
editPolicy.editSchedule();
editPolicy.editSchedule();
// No preemption happens
PreemptionManager pm = cs.getPreemptionManager();
Map<ContainerId, RMContainer> killableContainers =
waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
Assert.assertEquals(0, killableContainers.size());
// Call CS.handle once to see if container preempted
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
am2.getApplicationAttemptId());
// App1 has 7 containers, and app2 has 1 containers (nothing preempted)
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
rm1.close();
}
private Map<ContainerId, RMContainer> waitKillableContainersSize(
PreemptionManager pm, String queueName, String partition,
int expectedSize) throws InterruptedException {
Map<ContainerId, RMContainer> killableContainers =
pm.getKillableContainersMap(queueName, partition);
int wait = 0;
// Wait for at most 5 sec (it should be super fast actually)
while (expectedSize != killableContainers.size() && wait < 500) {
killableContainers = pm.getKillableContainersMap(queueName, partition);
Thread.sleep(10);
wait++;
}
Assert.assertEquals(expectedSize, killableContainers.size());
return killableContainers;
}
}

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -99,6 +100,7 @@ public void setUp() throws Exception {
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
}
private FiCaSchedulerApp getMockApplication(int appId, String user) {

View File

@ -71,6 +71,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -150,6 +151,7 @@ public void setUp() throws Exception {
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
@ -3092,6 +3094,7 @@ private CapacitySchedulerContext mockCSContext(
Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(2 * GB, 2));
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
return csContext;
}

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -1676,4 +1677,100 @@ public RMNodeLabelsManager createNodeLabelManager() {
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
}
@Test
public void testParentQueueMaxCapsAreRespected() throws Exception {
/*
* Queue tree:
* Root
* / \
* A B
* / \
* A1 A2
*
* A has 50% capacity and 50% max capacity (of label=x)
* A1/A2 has 50% capacity and 100% max capacity (of label=x)
* Cluster has one node (label=x) with resource = 24G.
* So we can at most use 12G resources under queueA.
*/
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
"b"});
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(A, 10);
csConf.setAccessibleNodeLabels(A, toSet("x"));
csConf.setCapacityByLabel(A, "x", 50);
csConf.setMaximumCapacityByLabel(A, "x", 50);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 90);
csConf.setAccessibleNodeLabels(B, toSet("x"));
csConf.setCapacityByLabel(B, "x", 50);
csConf.setMaximumCapacityByLabel(B, "x", 50);
// Define 2nd-level queues
csConf.setQueues(A, new String[] { "a1",
"a2"});
final String A1 = A + ".a1";
csConf.setCapacity(A1, 50);
csConf.setAccessibleNodeLabels(A1, toSet("x"));
csConf.setCapacityByLabel(A1, "x", 50);
csConf.setMaximumCapacityByLabel(A1, "x", 100);
csConf.setUserLimitFactor(A1, 100.0f);
final String A2 = A + ".a2";
csConf.setCapacity(A2, 50);
csConf.setAccessibleNodeLabels(A2, toSet("x"));
csConf.setCapacityByLabel(A2, "x", 50);
csConf.setMaximumCapacityByLabel(A2, "x", 100);
csConf.setUserLimitFactor(A2, 100.0f);
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(
NodeLabel.newInstance("x", false)));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 =
new MockNM("h1:1234", 24 * GB, rm.getResourceTrackerService());
nm1.registerNode();
// Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
am1.allocate("*", 4 * GB, 2, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 10);
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
// Try to launch app2 in a2, asked 2GB, should success
RMApp app2 = rm.submitApp(2 * GB, "app", "user", null, "a2", "x");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
// am2 asks more resources, cannot success because current used = 9G (app1)
// + 2G (app2) = 11G, and queue's max capacity = 12G
am2.allocate("*", 2 * GB, 2, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 10);
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
}
}

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -92,6 +93,7 @@ public void setUp() throws Exception {
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -126,6 +127,7 @@ private void setup(CapacitySchedulerConfiguration csConf,
when(csContext.getNonPartitionedQueueComparator()).thenReturn(
CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(
conf);

View File

@ -356,4 +356,40 @@ public static FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm,
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
}
/**
* Get a queue structure:
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*/
public static Configuration
getConfigurationWithMultipleQueues(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b", "c" });
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 100);
conf.setUserLimitFactor(A, 100);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 20);
conf.setMaximumCapacity(B, 100);
conf.setUserLimitFactor(B, 100);
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
conf.setCapacity(C, 70);
conf.setMaximumCapacity(C, 100);
conf.setUserLimitFactor(C, 100);
return conf;
}
}

View File

@ -1451,7 +1451,7 @@ public void testRecoverRequestAfterPreemption() throws Exception {
// Trigger container rescheduled event
scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer,
SchedulerEventType.KILL_PREEMPTED_CONTAINER));
SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
List<ResourceRequest> requests = rmContainer.getResourceRequests();
// Once recovered, resource request will be present again in app